001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.ThreadFactory;
032    import java.util.concurrent.ThreadPoolExecutor;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    import java.util.concurrent.atomic.AtomicInteger;
036    import javax.jms.Connection;
037    import javax.jms.ConnectionConsumer;
038    import javax.jms.ConnectionMetaData;
039    import javax.jms.DeliveryMode;
040    import javax.jms.Destination;
041    import javax.jms.ExceptionListener;
042    import javax.jms.IllegalStateException;
043    import javax.jms.InvalidDestinationException;
044    import javax.jms.JMSException;
045    import javax.jms.Queue;
046    import javax.jms.QueueConnection;
047    import javax.jms.QueueSession;
048    import javax.jms.ServerSessionPool;
049    import javax.jms.Session;
050    import javax.jms.Topic;
051    import javax.jms.TopicConnection;
052    import javax.jms.TopicSession;
053    import javax.jms.XAConnection;
054    import org.apache.activemq.advisory.DestinationSource;
055    import org.apache.activemq.blob.BlobTransferPolicy;
056    import org.apache.activemq.command.ActiveMQDestination;
057    import org.apache.activemq.command.ActiveMQMessage;
058    import org.apache.activemq.command.ActiveMQTempDestination;
059    import org.apache.activemq.command.ActiveMQTempQueue;
060    import org.apache.activemq.command.ActiveMQTempTopic;
061    import org.apache.activemq.command.BrokerInfo;
062    import org.apache.activemq.command.Command;
063    import org.apache.activemq.command.CommandTypes;
064    import org.apache.activemq.command.ConnectionControl;
065    import org.apache.activemq.command.ConnectionError;
066    import org.apache.activemq.command.ConnectionId;
067    import org.apache.activemq.command.ConnectionInfo;
068    import org.apache.activemq.command.ConsumerControl;
069    import org.apache.activemq.command.ConsumerId;
070    import org.apache.activemq.command.ConsumerInfo;
071    import org.apache.activemq.command.ControlCommand;
072    import org.apache.activemq.command.DestinationInfo;
073    import org.apache.activemq.command.ExceptionResponse;
074    import org.apache.activemq.command.Message;
075    import org.apache.activemq.command.MessageDispatch;
076    import org.apache.activemq.command.MessageId;
077    import org.apache.activemq.command.ProducerAck;
078    import org.apache.activemq.command.ProducerId;
079    import org.apache.activemq.command.RemoveInfo;
080    import org.apache.activemq.command.RemoveSubscriptionInfo;
081    import org.apache.activemq.command.Response;
082    import org.apache.activemq.command.SessionId;
083    import org.apache.activemq.command.ShutdownInfo;
084    import org.apache.activemq.command.WireFormatInfo;
085    import org.apache.activemq.management.JMSConnectionStatsImpl;
086    import org.apache.activemq.management.JMSStatsImpl;
087    import org.apache.activemq.management.StatsCapable;
088    import org.apache.activemq.management.StatsImpl;
089    import org.apache.activemq.state.CommandVisitorAdapter;
090    import org.apache.activemq.thread.Scheduler;
091    import org.apache.activemq.thread.TaskRunnerFactory;
092    import org.apache.activemq.transport.Transport;
093    import org.apache.activemq.transport.TransportListener;
094    import org.apache.activemq.transport.failover.FailoverTransport;
095    import org.apache.activemq.util.IdGenerator;
096    import org.apache.activemq.util.IntrospectionSupport;
097    import org.apache.activemq.util.JMSExceptionSupport;
098    import org.apache.activemq.util.LongSequenceGenerator;
099    import org.apache.activemq.util.ServiceSupport;
100    import org.slf4j.Logger;
101    import org.slf4j.LoggerFactory;
102    
103    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104    
105        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108    
109        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
110        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111    
112        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113    
114        protected boolean dispatchAsync=true;
115        protected boolean alwaysSessionAsync = true;
116    
117        private TaskRunnerFactory sessionTaskRunner;
118        private final ThreadPoolExecutor executor;
119    
120        // Connection state variables
121        private final ConnectionInfo info;
122        private ExceptionListener exceptionListener;
123        private ClientInternalExceptionListener clientInternalExceptionListener;
124        private boolean clientIDSet;
125        private boolean isConnectionInfoSentToBroker;
126        private boolean userSpecifiedClientID;
127    
128        // Configuration options variables
129        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130        private BlobTransferPolicy blobTransferPolicy;
131        private RedeliveryPolicy redeliveryPolicy;
132        private MessageTransformer transformer;
133    
134        private boolean disableTimeStampsByDefault;
135        private boolean optimizedMessageDispatch = true;
136        private boolean copyMessageOnSend = true;
137        private boolean useCompression;
138        private boolean objectMessageSerializationDefered;
139        private boolean useAsyncSend;
140        private boolean optimizeAcknowledge;
141        private boolean nestedMapAndListEnabled = true;
142        private boolean useRetroactiveConsumer;
143        private boolean exclusiveConsumer;
144        private boolean alwaysSyncSend;
145        private int closeTimeout = 15000;
146        private boolean watchTopicAdvisories = true;
147        private long warnAboutUnstartedConnectionTimeout = 500L;
148        private int sendTimeout =0;
149        private boolean sendAcksAsync=true;
150        private boolean checkForDuplicates = true;
151    
152        private final Transport transport;
153        private final IdGenerator clientIdGenerator;
154        private final JMSStatsImpl factoryStats;
155        private final JMSConnectionStatsImpl stats;
156    
157        private final AtomicBoolean started = new AtomicBoolean(false);
158        private final AtomicBoolean closing = new AtomicBoolean(false);
159        private final AtomicBoolean closed = new AtomicBoolean(false);
160        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
161        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
162        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
163        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
164        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
165        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
166    
167        // Maps ConsumerIds to ActiveMQConsumer objects
168        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
169        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
170        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
171        private final SessionId connectionSessionId;
172        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
173        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
174        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
175        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
176    
177        private AdvisoryConsumer advisoryConsumer;
178        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
179        private BrokerInfo brokerInfo;
180        private IOException firstFailureError;
181        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
182    
183        // Assume that protocol is the latest. Change to the actual protocol
184        // version when a WireFormatInfo is received.
185        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
186        private final long timeCreated;
187        private final ConnectionAudit connectionAudit = new ConnectionAudit();
188        private DestinationSource destinationSource;
189        private final Object ensureConnectionInfoSentMutex = new Object();
190        private boolean useDedicatedTaskRunner;
191        protected volatile CountDownLatch transportInterruptionProcessingComplete;
192        private long consumerFailoverRedeliveryWaitPeriod;
193        private final Scheduler scheduler;
194        private boolean messagePrioritySupported=true;
195    
196        /**
197         * Construct an <code>ActiveMQConnection</code>
198         * 
199         * @param transport
200         * @param factoryStats
201         * @throws Exception
202         */
203        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204    
205            this.transport = transport;
206            this.clientIdGenerator = clientIdGenerator;
207            this.factoryStats = factoryStats;
208    
209            // Configure a single threaded executor who's core thread can timeout if
210            // idle
211            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212                public Thread newThread(Runnable r) {
213                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                    thread.setDaemon(true);
215                    return thread;
216                }
217            });
218            // asyncConnectionThread.allowCoreThreadTimeOut(true);
219            String uniqueId = CONNECTION_ID_GENERATOR.generateId();
220            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
221            this.info.setManageable(true);
222            this.info.setFaultTolerant(transport.isFaultTolerant());
223            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
224    
225            this.transport.setTransportListener(this);
226    
227            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
228            this.factoryStats.addConnection(this);
229            this.timeCreated = System.currentTimeMillis();
230            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
231            this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
232            this.scheduler.start();
233        }
234    
235        protected void setUserName(String userName) {
236            this.info.setUserName(userName);
237        }
238    
239        protected void setPassword(String password) {
240            this.info.setPassword(password);
241        }
242    
243        /**
244         * A static helper method to create a new connection
245         * 
246         * @return an ActiveMQConnection
247         * @throws JMSException
248         */
249        public static ActiveMQConnection makeConnection() throws JMSException {
250            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
251            return (ActiveMQConnection)factory.createConnection();
252        }
253    
254        /**
255         * A static helper method to create a new connection
256         * 
257         * @param uri
258         * @return and ActiveMQConnection
259         * @throws JMSException
260         */
261        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
262            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
263            return (ActiveMQConnection)factory.createConnection();
264        }
265    
266        /**
267         * A static helper method to create a new connection
268         * 
269         * @param user
270         * @param password
271         * @param uri
272         * @return an ActiveMQConnection
273         * @throws JMSException
274         */
275        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
276            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
277            return (ActiveMQConnection)factory.createConnection();
278        }
279    
280        /**
281         * @return a number unique for this connection
282         */
283        public JMSConnectionStatsImpl getConnectionStats() {
284            return stats;
285        }
286    
287        /**
288         * Creates a <CODE>Session</CODE> object.
289         * 
290         * @param transacted indicates whether the session is transacted
291         * @param acknowledgeMode indicates whether the consumer or the client will
292         *                acknowledge any messages it receives; ignored if the
293         *                session is transacted. Legal values are
294         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
295         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
296         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
297         * @return a newly created session
298         * @throws JMSException if the <CODE>Connection</CODE> object fails to
299         *                 create a session due to some internal error or lack of
300         *                 support for the specific transaction and acknowledgement
301         *                 mode.
302         * @see Session#AUTO_ACKNOWLEDGE
303         * @see Session#CLIENT_ACKNOWLEDGE
304         * @see Session#DUPS_OK_ACKNOWLEDGE
305         * @since 1.1
306         */
307        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
308            checkClosedOrFailed();
309            ensureConnectionInfoSent();
310            if(!transacted) {
311                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
312                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
313                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
314                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
315                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
316                }
317            }
318            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
319                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
320        }
321    
322        /**
323         * @return sessionId
324         */
325        protected SessionId getNextSessionId() {
326            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
327        }
328    
329        /**
330         * Gets the client identifier for this connection.
331         * <P>
332         * This value is specific to the JMS provider. It is either preconfigured by
333         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
334         * dynamically by the application by calling the <code>setClientID</code>
335         * method.
336         * 
337         * @return the unique client identifier
338         * @throws JMSException if the JMS provider fails to return the client ID
339         *                 for this connection due to some internal error.
340         */
341        public String getClientID() throws JMSException {
342            checkClosedOrFailed();
343            return this.info.getClientId();
344        }
345    
346        /**
347         * Sets the client identifier for this connection.
348         * <P>
349         * The preferred way to assign a JMS client's client identifier is for it to
350         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
351         * object and transparently assigned to the <CODE>Connection</CODE> object
352         * it creates.
353         * <P>
354         * Alternatively, a client can set a connection's client identifier using a
355         * provider-specific value. The facility to set a connection's client
356         * identifier explicitly is not a mechanism for overriding the identifier
357         * that has been administratively configured. It is provided for the case
358         * where no administratively specified identifier exists. If one does exist,
359         * an attempt to change it by setting it must throw an
360         * <CODE>IllegalStateException</CODE>. If a client sets the client
361         * identifier explicitly, it must do so immediately after it creates the
362         * connection and before any other action on the connection is taken. After
363         * this point, setting the client identifier is a programming error that
364         * should throw an <CODE>IllegalStateException</CODE>.
365         * <P>
366         * The purpose of the client identifier is to associate a connection and its
367         * objects with a state maintained on behalf of the client by a provider.
368         * The only such state identified by the JMS API is that required to support
369         * durable subscriptions.
370         * <P>
371         * If another connection with the same <code>clientID</code> is already
372         * running when this method is called, the JMS provider should detect the
373         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
374         * 
375         * @param newClientID the unique client identifier
376         * @throws JMSException if the JMS provider fails to set the client ID for
377         *                 this connection due to some internal error.
378         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
379         *                 invalid or duplicate client ID.
380         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
381         *                 a connection's client ID at the wrong time or when it has
382         *                 been administratively configured.
383         */
384        public void setClientID(String newClientID) throws JMSException {
385            checkClosedOrFailed();
386    
387            if (this.clientIDSet) {
388                throw new IllegalStateException("The clientID has already been set");
389            }
390    
391            if (this.isConnectionInfoSentToBroker) {
392                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
393            }
394    
395            this.info.setClientId(newClientID);
396            this.userSpecifiedClientID = true;
397            ensureConnectionInfoSent();
398        }
399    
400        /**
401         * Sets the default client id that the connection will use if explicitly not
402         * set with the setClientId() call.
403         */
404        public void setDefaultClientID(String clientID) throws JMSException {
405            this.info.setClientId(clientID);
406            this.userSpecifiedClientID = true;
407        }
408    
409        /**
410         * Gets the metadata for this connection.
411         * 
412         * @return the connection metadata
413         * @throws JMSException if the JMS provider fails to get the connection
414         *                 metadata for this connection.
415         * @see javax.jms.ConnectionMetaData
416         */
417        public ConnectionMetaData getMetaData() throws JMSException {
418            checkClosedOrFailed();
419            return ActiveMQConnectionMetaData.INSTANCE;
420        }
421    
422        /**
423         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
424         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
425         * associated with it.
426         * 
427         * @return the <CODE>ExceptionListener</CODE> for this connection, or
428         *         null, if no <CODE>ExceptionListener</CODE> is associated with
429         *         this connection.
430         * @throws JMSException if the JMS provider fails to get the
431         *                 <CODE>ExceptionListener</CODE> for this connection.
432         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
433         */
434        public ExceptionListener getExceptionListener() throws JMSException {
435            checkClosedOrFailed();
436            return this.exceptionListener;
437        }
438    
439        /**
440         * Sets an exception listener for this connection.
441         * <P>
442         * If a JMS provider detects a serious problem with a connection, it informs
443         * the connection's <CODE> ExceptionListener</CODE>, if one has been
444         * registered. It does this by calling the listener's <CODE>onException
445         * </CODE>
446         * method, passing it a <CODE>JMSException</CODE> object describing the
447         * problem.
448         * <P>
449         * An exception listener allows a client to be notified of a problem
450         * asynchronously. Some connections only consume messages, so they would
451         * have no other way to learn their connection has failed.
452         * <P>
453         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
454         * <P>
455         * A JMS provider should attempt to resolve connection problems itself
456         * before it notifies the client of them.
457         * 
458         * @param listener the exception listener
459         * @throws JMSException if the JMS provider fails to set the exception
460         *                 listener for this connection.
461         */
462        public void setExceptionListener(ExceptionListener listener) throws JMSException {
463            checkClosedOrFailed();
464            this.exceptionListener = listener;
465        }
466    
467        /**
468         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
469         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
470         * associated with it.
471         * 
472         * @return the listener or <code>null</code> if no listener is registered with the connection.
473         */
474        public ClientInternalExceptionListener getClientInternalExceptionListener()
475        {
476            return clientInternalExceptionListener;
477        }
478    
479        /**
480         * Sets a client internal exception listener for this connection.
481         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
482         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
483         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
484         * describing the problem.
485         * 
486         * @param listener the exception listener
487         */
488        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
489        {
490            this.clientInternalExceptionListener = listener;
491        }
492        
493        /**
494         * Starts (or restarts) a connection's delivery of incoming messages. A call
495         * to <CODE>start</CODE> on a connection that has already been started is
496         * ignored.
497         * 
498         * @throws JMSException if the JMS provider fails to start message delivery
499         *                 due to some internal error.
500         * @see javax.jms.Connection#stop()
501         */
502        public void start() throws JMSException {
503            checkClosedOrFailed();
504            ensureConnectionInfoSent();
505            if (started.compareAndSet(false, true)) {
506                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
507                    ActiveMQSession session = i.next();
508                    session.start();
509                }
510            }
511        }
512    
513        /**
514         * Temporarily stops a connection's delivery of incoming messages. Delivery
515         * can be restarted using the connection's <CODE>start</CODE> method. When
516         * the connection is stopped, delivery to all the connection's message
517         * consumers is inhibited: synchronous receives block, and messages are not
518         * delivered to message listeners.
519         * <P>
520         * This call blocks until receives and/or message listeners in progress have
521         * completed.
522         * <P>
523         * Stopping a connection has no effect on its ability to send messages. A
524         * call to <CODE>stop</CODE> on a connection that has already been stopped
525         * is ignored.
526         * <P>
527         * A call to <CODE>stop</CODE> must not return until delivery of messages
528         * has paused. This means that a client can rely on the fact that none of
529         * its message listeners will be called and that all threads of control
530         * waiting for <CODE>receive</CODE> calls to return will not return with a
531         * message until the connection is restarted. The receive timers for a
532         * stopped connection continue to advance, so receives may time out while
533         * the connection is stopped.
534         * <P>
535         * If message listeners are running when <CODE>stop</CODE> is invoked, the
536         * <CODE>stop</CODE> call must wait until all of them have returned before
537         * it may return. While these message listeners are completing, they must
538         * have the full services of the connection available to them.
539         * 
540         * @throws JMSException if the JMS provider fails to stop message delivery
541         *                 due to some internal error.
542         * @see javax.jms.Connection#start()
543         */
544        public void stop() throws JMSException {
545            checkClosedOrFailed();
546            if (started.compareAndSet(true, false)) {
547                synchronized(sessions) {
548                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
549                        ActiveMQSession s = i.next();
550                        s.stop();
551                    }
552                }
553            }
554        }
555    
556        /**
557         * Closes the connection.
558         * <P>
559         * Since a provider typically allocates significant resources outside the
560         * JVM on behalf of a connection, clients should close these resources when
561         * they are not needed. Relying on garbage collection to eventually reclaim
562         * these resources may not be timely enough.
563         * <P>
564         * There is no need to close the sessions, producers, and consumers of a
565         * closed connection.
566         * <P>
567         * Closing a connection causes all temporary destinations to be deleted.
568         * <P>
569         * When this method is invoked, it should not return until message
570         * processing has been shut down in an orderly fashion. This means that all
571         * message listeners that may have been running have returned, and that all
572         * pending receives have returned. A close terminates all pending message
573         * receives on the connection's sessions' consumers. The receives may return
574         * with a message or with null, depending on whether there was a message
575         * available at the time of the close. If one or more of the connection's
576         * sessions' message listeners is processing a message at the time when
577         * connection <CODE>close</CODE> is invoked, all the facilities of the
578         * connection and its sessions must remain available to those listeners
579         * until they return control to the JMS provider.
580         * <P>
581         * Closing a connection causes any of its sessions' transactions in progress
582         * to be rolled back. In the case where a session's work is coordinated by
583         * an external transaction manager, a session's <CODE>commit</CODE> and
584         * <CODE> rollback</CODE> methods are not used and the result of a closed
585         * session's work is determined later by the transaction manager. Closing a
586         * connection does NOT force an acknowledgment of client-acknowledged
587         * sessions.
588         * <P>
589         * Invoking the <CODE>acknowledge</CODE> method of a received message from
590         * a closed connection's session must throw an
591         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
592         * NOT throw an exception.
593         * 
594         * @throws JMSException if the JMS provider fails to close the connection
595         *                 due to some internal error. For example, a failure to
596         *                 release resources or to close a socket connection can
597         *                 cause this exception to be thrown.
598         */
599        public void close() throws JMSException {
600            try {
601                // If we were running, lets stop first.
602                if (!closed.get() && !transportFailed.get()) {
603                    stop();
604                }
605    
606                synchronized (this) {
607                    if (!closed.get()) {
608                        closing.set(true);
609    
610                        if (destinationSource != null) {
611                            destinationSource.stop();
612                            destinationSource = null;
613                        }
614                        if (advisoryConsumer != null) {
615                            advisoryConsumer.dispose();
616                            advisoryConsumer = null;
617                        }
618                        if (this.scheduler != null) {
619                            try {
620                                this.scheduler.stop();
621                            } catch (Exception e) {
622                                JMSException ex =  JMSExceptionSupport.create(e);
623                                throw ex;
624                            }
625                        }
626    
627                        long lastDeliveredSequenceId = 0;
628                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
629                            ActiveMQSession s = i.next();
630                            s.dispose();
631                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
632                        }
633                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
634                            ActiveMQConnectionConsumer c = i.next();
635                            c.dispose();
636                        }
637                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
638                            ActiveMQInputStream c = i.next();
639                            c.dispose();
640                        }
641                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
642                            ActiveMQOutputStream c = i.next();
643                            c.dispose();
644                        }
645    
646                        // As TemporaryQueue and TemporaryTopic instances are bound
647                        // to a connection we should just delete them after the connection
648                        // is closed to free up memory
649                        for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
650                            ActiveMQTempDestination c = i.next();
651                            c.delete();
652                        }
653                        
654                        if (isConnectionInfoSentToBroker) {
655                            // If we announced ourselfs to the broker.. Try to let
656                            // the broker
657                            // know that the connection is being shutdown.
658                            RemoveInfo removeCommand = info.createRemoveCommand();
659                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
661                            doAsyncSendPacket(new ShutdownInfo());
662                        }
663    
664                        ServiceSupport.dispose(this.transport);
665    
666                        started.set(false);
667    
668                        // TODO if we move the TaskRunnerFactory to the connection
669                        // factory
670                        // then we may need to call
671                        // factory.onConnectionClose(this);
672                        if (sessionTaskRunner != null) {
673                            sessionTaskRunner.shutdown();
674                        }
675                        closed.set(true);
676                        closing.set(false);
677                    }
678                }
679            } finally {
680                try {
681                    if (executor != null){
682                        executor.shutdown();
683                    }
684                }catch(Throwable e) {
685                    LOG.error("Error shutting down thread pool " + e,e);
686                }
687                factoryStats.removeConnection(this);
688            }
689        }
690    
691        /**
692         * Tells the broker to terminate its VM. This can be used to cleanly
693         * terminate a broker running in a standalone java process. Server must have
694         * property enable.vm.shutdown=true defined to allow this to work.
695         */
696        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
697        // implemented.
698        /*
699         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
700         * command = new BrokerAdminCommand();
701         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
702         * asyncSendPacket(command); }
703         */
704    
705        /**
706         * Create a durable connection consumer for this connection (optional
707         * operation). This is an expert facility not used by regular JMS clients.
708         * 
709         * @param topic topic to access
710         * @param subscriptionName durable subscription name
711         * @param messageSelector only messages with properties matching the message
712         *                selector expression are delivered. A value of null or an
713         *                empty string indicates that there is no message selector
714         *                for the message consumer.
715         * @param sessionPool the server session pool to associate with this durable
716         *                connection consumer
717         * @param maxMessages the maximum number of messages that can be assigned to
718         *                a server session at one time
719         * @return the durable connection consumer
720         * @throws JMSException if the <CODE>Connection</CODE> object fails to
721         *                 create a connection consumer due to some internal error
722         *                 or invalid arguments for <CODE>sessionPool</CODE> and
723         *                 <CODE>messageSelector</CODE>.
724         * @throws javax.jms.InvalidDestinationException if an invalid destination
725         *                 is specified.
726         * @throws javax.jms.InvalidSelectorException if the message selector is
727         *                 invalid.
728         * @see javax.jms.ConnectionConsumer
729         * @since 1.1
730         */
731        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
732            throws JMSException {
733            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
734        }
735    
736        /**
737         * Create a durable connection consumer for this connection (optional
738         * operation). This is an expert facility not used by regular JMS clients.
739         * 
740         * @param topic topic to access
741         * @param subscriptionName durable subscription name
742         * @param messageSelector only messages with properties matching the message
743         *                selector expression are delivered. A value of null or an
744         *                empty string indicates that there is no message selector
745         *                for the message consumer.
746         * @param sessionPool the server session pool to associate with this durable
747         *                connection consumer
748         * @param maxMessages the maximum number of messages that can be assigned to
749         *                a server session at one time
750         * @param noLocal set true if you want to filter out messages published
751         *                locally
752         * @return the durable connection consumer
753         * @throws JMSException if the <CODE>Connection</CODE> object fails to
754         *                 create a connection consumer due to some internal error
755         *                 or invalid arguments for <CODE>sessionPool</CODE> and
756         *                 <CODE>messageSelector</CODE>.
757         * @throws javax.jms.InvalidDestinationException if an invalid destination
758         *                 is specified.
759         * @throws javax.jms.InvalidSelectorException if the message selector is
760         *                 invalid.
761         * @see javax.jms.ConnectionConsumer
762         * @since 1.1
763         */
764        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
765                                                                  boolean noLocal) throws JMSException {
766            checkClosedOrFailed();
767            ensureConnectionInfoSent();
768            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
769            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
770            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
771            info.setSubscriptionName(subscriptionName);
772            info.setSelector(messageSelector);
773            info.setPrefetchSize(maxMessages);
774            info.setDispatchAsync(isDispatchAsync());
775    
776            // Allows the options on the destination to configure the consumerInfo
777            if (info.getDestination().getOptions() != null) {
778                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
779                IntrospectionSupport.setProperties(this.info, options, "consumer.");
780            }
781    
782            return new ActiveMQConnectionConsumer(this, sessionPool, info);
783        }
784    
785        // Properties
786        // -------------------------------------------------------------------------
787    
788        /**
789         * Returns true if this connection has been started
790         * 
791         * @return true if this Connection is started
792         */
793        public boolean isStarted() {
794            return started.get();
795        }
796    
797        /**
798         * Returns true if the connection is closed
799         */
800        public boolean isClosed() {
801            return closed.get();
802        }
803    
804        /**
805         * Returns true if the connection is in the process of being closed
806         */
807        public boolean isClosing() {
808            return closing.get();
809        }
810    
811        /**
812         * Returns true if the underlying transport has failed
813         */
814        public boolean isTransportFailed() {
815            return transportFailed.get();
816        }
817    
818        /**
819         * @return Returns the prefetchPolicy.
820         */
821        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
822            return prefetchPolicy;
823        }
824    
825        /**
826         * Sets the <a
827         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
828         * policy</a> for consumers created by this connection.
829         */
830        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
831            this.prefetchPolicy = prefetchPolicy;
832        }
833    
834        /**
835         */
836        public Transport getTransportChannel() {
837            return transport;
838        }
839    
840        /**
841         * @return Returns the clientID of the connection, forcing one to be
842         *         generated if one has not yet been configured.
843         */
844        public String getInitializedClientID() throws JMSException {
845            ensureConnectionInfoSent();
846            return info.getClientId();
847        }
848    
849        /**
850         * @return Returns the timeStampsDisableByDefault.
851         */
852        public boolean isDisableTimeStampsByDefault() {
853            return disableTimeStampsByDefault;
854        }
855    
856        /**
857         * Sets whether or not timestamps on messages should be disabled or not. If
858         * you disable them it adds a small performance boost.
859         */
860        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
861            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
862        }
863    
864        /**
865         * @return Returns the dispatchOptimizedMessage.
866         */
867        public boolean isOptimizedMessageDispatch() {
868            return optimizedMessageDispatch;
869        }
870    
871        /**
872         * If this flag is set then an larger prefetch limit is used - only
873         * applicable for durable topic subscribers.
874         */
875        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
876            this.optimizedMessageDispatch = dispatchOptimizedMessage;
877        }
878    
879        /**
880         * @return Returns the closeTimeout.
881         */
882        public int getCloseTimeout() {
883            return closeTimeout;
884        }
885    
886        /**
887         * Sets the timeout before a close is considered complete. Normally a
888         * close() on a connection waits for confirmation from the broker; this
889         * allows that operation to timeout to save the client hanging if there is
890         * no broker
891         */
892        public void setCloseTimeout(int closeTimeout) {
893            this.closeTimeout = closeTimeout;
894        }
895    
896        /**
897         * @return ConnectionInfo
898         */
899        public ConnectionInfo getConnectionInfo() {
900            return this.info;
901        }
902    
903        public boolean isUseRetroactiveConsumer() {
904            return useRetroactiveConsumer;
905        }
906    
907        /**
908         * Sets whether or not retroactive consumers are enabled. Retroactive
909         * consumers allow non-durable topic subscribers to receive old messages
910         * that were published before the non-durable subscriber started.
911         */
912        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
913            this.useRetroactiveConsumer = useRetroactiveConsumer;
914        }
915    
916        public boolean isNestedMapAndListEnabled() {
917            return nestedMapAndListEnabled;
918        }
919    
920        /**
921         * Enables/disables whether or not Message properties and MapMessage entries
922         * support <a
923         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
924         * Structures</a> of Map and List objects
925         */
926        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
927            this.nestedMapAndListEnabled = structuredMapsEnabled;
928        }
929    
930        public boolean isExclusiveConsumer() {
931            return exclusiveConsumer;
932        }
933    
934        /**
935         * Enables or disables whether or not queue consumers should be exclusive or
936         * not for example to preserve ordering when not using <a
937         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
938         * 
939         * @param exclusiveConsumer
940         */
941        public void setExclusiveConsumer(boolean exclusiveConsumer) {
942            this.exclusiveConsumer = exclusiveConsumer;
943        }
944    
945        /**
946         * Adds a transport listener so that a client can be notified of events in
947         * the underlying transport
948         */
949        public void addTransportListener(TransportListener transportListener) {
950            transportListeners.add(transportListener);
951        }
952    
953        public void removeTransportListener(TransportListener transportListener) {
954            transportListeners.remove(transportListener);
955        }
956    
957        public boolean isUseDedicatedTaskRunner() {
958            return useDedicatedTaskRunner;
959        }
960        
961        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
962            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
963        }
964    
965        public TaskRunnerFactory getSessionTaskRunner() {
966            synchronized (this) {
967                if (sessionTaskRunner == null) {
968                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
969                }
970            }
971            return sessionTaskRunner;
972        }
973    
974        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
975            this.sessionTaskRunner = sessionTaskRunner;
976        }
977    
978        public MessageTransformer getTransformer() {
979            return transformer;
980        }
981    
982        /**
983         * Sets the transformer used to transform messages before they are sent on
984         * to the JMS bus or when they are received from the bus but before they are
985         * delivered to the JMS client
986         */
987        public void setTransformer(MessageTransformer transformer) {
988            this.transformer = transformer;
989        }
990    
991        /**
992         * @return the statsEnabled
993         */
994        public boolean isStatsEnabled() {
995            return this.stats.isEnabled();
996        }
997    
998        /**
999         * @param statsEnabled the statsEnabled to set
1000         */
1001        public void setStatsEnabled(boolean statsEnabled) {
1002            this.stats.setEnabled(statsEnabled);
1003        }
1004    
1005        /**
1006         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1007         * being created or destroyed or to enquire about the current destinations available on the broker
1008         *
1009         * @return a lazily created destination source
1010         * @throws JMSException
1011         */
1012        public DestinationSource getDestinationSource() throws JMSException {
1013            if (destinationSource == null) {
1014                destinationSource = new DestinationSource(this);
1015                destinationSource.start();
1016            }
1017            return destinationSource;
1018        }
1019    
1020        // Implementation methods
1021        // -------------------------------------------------------------------------
1022    
1023        /**
1024         * Used internally for adding Sessions to the Connection
1025         * 
1026         * @param session
1027         * @throws JMSException
1028         * @throws JMSException
1029         */
1030        protected void addSession(ActiveMQSession session) throws JMSException {
1031            this.sessions.add(session);
1032            if (sessions.size() > 1 || session.isTransacted()) {
1033                optimizedMessageDispatch = false;
1034            }
1035        }
1036    
1037        /**
1038         * Used interanlly for removing Sessions from a Connection
1039         * 
1040         * @param session
1041         */
1042        protected void removeSession(ActiveMQSession session) {
1043            this.sessions.remove(session);
1044            this.removeDispatcher(session);
1045        }
1046    
1047        /**
1048         * Add a ConnectionConsumer
1049         * 
1050         * @param connectionConsumer
1051         * @throws JMSException
1052         */
1053        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1054            this.connectionConsumers.add(connectionConsumer);
1055        }
1056    
1057        /**
1058         * Remove a ConnectionConsumer
1059         * 
1060         * @param connectionConsumer
1061         */
1062        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1063            this.connectionConsumers.remove(connectionConsumer);
1064            this.removeDispatcher(connectionConsumer);
1065        }
1066    
1067        /**
1068         * Creates a <CODE>TopicSession</CODE> object.
1069         * 
1070         * @param transacted indicates whether the session is transacted
1071         * @param acknowledgeMode indicates whether the consumer or the client will
1072         *                acknowledge any messages it receives; ignored if the
1073         *                session is transacted. Legal values are
1074         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1075         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1076         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1077         * @return a newly created topic session
1078         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1079         *                 to create a session due to some internal error or lack of
1080         *                 support for the specific transaction and acknowledgement
1081         *                 mode.
1082         * @see Session#AUTO_ACKNOWLEDGE
1083         * @see Session#CLIENT_ACKNOWLEDGE
1084         * @see Session#DUPS_OK_ACKNOWLEDGE
1085         */
1086        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1087            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1088        }
1089    
1090        /**
1091         * Creates a connection consumer for this connection (optional operation).
1092         * This is an expert facility not used by regular JMS clients.
1093         * 
1094         * @param topic the topic to access
1095         * @param messageSelector only messages with properties matching the message
1096         *                selector expression are delivered. A value of null or an
1097         *                empty string indicates that there is no message selector
1098         *                for the message consumer.
1099         * @param sessionPool the server session pool to associate with this
1100         *                connection consumer
1101         * @param maxMessages the maximum number of messages that can be assigned to
1102         *                a server session at one time
1103         * @return the connection consumer
1104         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1105         *                 to create a connection consumer due to some internal
1106         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1107         *                 and <CODE>messageSelector</CODE>.
1108         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1109         *                 specified.
1110         * @throws javax.jms.InvalidSelectorException if the message selector is
1111         *                 invalid.
1112         * @see javax.jms.ConnectionConsumer
1113         */
1114        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1115            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1116        }
1117    
1118        /**
1119         * Creates a connection consumer for this connection (optional operation).
1120         * This is an expert facility not used by regular JMS clients.
1121         * 
1122         * @param queue the queue to access
1123         * @param messageSelector only messages with properties matching the message
1124         *                selector expression are delivered. A value of null or an
1125         *                empty string indicates that there is no message selector
1126         *                for the message consumer.
1127         * @param sessionPool the server session pool to associate with this
1128         *                connection consumer
1129         * @param maxMessages the maximum number of messages that can be assigned to
1130         *                a server session at one time
1131         * @return the connection consumer
1132         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1133         *                 to create a connection consumer due to some internal
1134         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1135         *                 and <CODE>messageSelector</CODE>.
1136         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1137         *                 specified.
1138         * @throws javax.jms.InvalidSelectorException if the message selector is
1139         *                 invalid.
1140         * @see javax.jms.ConnectionConsumer
1141         */
1142        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1143            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1144        }
1145    
1146        /**
1147         * Creates a connection consumer for this connection (optional operation).
1148         * This is an expert facility not used by regular JMS clients.
1149         * 
1150         * @param destination the destination to access
1151         * @param messageSelector only messages with properties matching the message
1152         *                selector expression are delivered. A value of null or an
1153         *                empty string indicates that there is no message selector
1154         *                for the message consumer.
1155         * @param sessionPool the server session pool to associate with this
1156         *                connection consumer
1157         * @param maxMessages the maximum number of messages that can be assigned to
1158         *                a server session at one time
1159         * @return the connection consumer
1160         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1161         *                 create a connection consumer due to some internal error
1162         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1163         *                 <CODE>messageSelector</CODE>.
1164         * @throws javax.jms.InvalidDestinationException if an invalid destination
1165         *                 is specified.
1166         * @throws javax.jms.InvalidSelectorException if the message selector is
1167         *                 invalid.
1168         * @see javax.jms.ConnectionConsumer
1169         * @since 1.1
1170         */
1171        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1172            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1173        }
1174    
1175        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1176            throws JMSException {
1177    
1178            checkClosedOrFailed();
1179            ensureConnectionInfoSent();
1180    
1181            ConsumerId consumerId = createConsumerId();
1182            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1183            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1184            consumerInfo.setSelector(messageSelector);
1185            consumerInfo.setPrefetchSize(maxMessages);
1186            consumerInfo.setNoLocal(noLocal);
1187            consumerInfo.setDispatchAsync(isDispatchAsync());
1188    
1189            // Allows the options on the destination to configure the consumerInfo
1190            if (consumerInfo.getDestination().getOptions() != null) {
1191                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1192                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1193            }
1194    
1195            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1196        }
1197    
1198        /**
1199         * @return
1200         */
1201        private ConsumerId createConsumerId() {
1202            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1203        }
1204    
1205        /**
1206         * @return
1207         */
1208        private ProducerId createProducerId() {
1209            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1210        }
1211    
1212        /**
1213         * Creates a <CODE>QueueSession</CODE> object.
1214         * 
1215         * @param transacted indicates whether the session is transacted
1216         * @param acknowledgeMode indicates whether the consumer or the client will
1217         *                acknowledge any messages it receives; ignored if the
1218         *                session is transacted. Legal values are
1219         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1220         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1221         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1222         * @return a newly created queue session
1223         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1224         *                 to create a session due to some internal error or lack of
1225         *                 support for the specific transaction and acknowledgement
1226         *                 mode.
1227         * @see Session#AUTO_ACKNOWLEDGE
1228         * @see Session#CLIENT_ACKNOWLEDGE
1229         * @see Session#DUPS_OK_ACKNOWLEDGE
1230         */
1231        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1232            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1233        }
1234    
1235        /**
1236         * Ensures that the clientID was manually specified and not auto-generated.
1237         * If the clientID was not specified this method will throw an exception.
1238         * This method is used to ensure that the clientID + durableSubscriber name
1239         * are used correctly.
1240         * 
1241         * @throws JMSException
1242         */
1243        public void checkClientIDWasManuallySpecified() throws JMSException {
1244            if (!userSpecifiedClientID) {
1245                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1246            }
1247        }
1248    
1249        /**
1250         * send a Packet through the Connection - for internal use only
1251         * 
1252         * @param command
1253         * @throws JMSException
1254         */
1255        public void asyncSendPacket(Command command) throws JMSException {
1256            if (isClosed()) {
1257                throw new ConnectionClosedException();
1258            } else {
1259                doAsyncSendPacket(command);
1260            }
1261        }
1262    
1263            private void doAsyncSendPacket(Command command) throws JMSException {
1264                    try {
1265                        this.transport.oneway(command);
1266                    } catch (IOException e) {
1267                        throw JMSExceptionSupport.create(e);
1268                    }
1269            }
1270    
1271        /**
1272         * Send a packet through a Connection - for internal use only
1273         * 
1274         * @param command
1275         * @return
1276         * @throws JMSException
1277         */
1278        public Response syncSendPacket(Command command) throws JMSException {
1279            if (isClosed()) {
1280                throw new ConnectionClosedException();
1281            } else {
1282    
1283                try {
1284                    Response response = (Response)this.transport.request(command);
1285                    if (response.isException()) {
1286                        ExceptionResponse er = (ExceptionResponse)response;
1287                        if (er.getException() instanceof JMSException) {
1288                            throw (JMSException)er.getException();
1289                        } else {
1290                            if (isClosed()||closing.get()) {
1291                                LOG.debug("Received an exception but connection is closing");
1292                            }
1293                            JMSException jmsEx = null;
1294                            try {
1295                             jmsEx = JMSExceptionSupport.create(er.getException());
1296                            }catch(Throwable e) {
1297                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1298                            }
1299                            if(jmsEx !=null) {
1300                                throw jmsEx;
1301                            }
1302                        }
1303                    }
1304                    return response;
1305                } catch (IOException e) {
1306                    throw JMSExceptionSupport.create(e);
1307                }
1308            }
1309        }
1310    
1311        /**
1312         * Send a packet through a Connection - for internal use only
1313         * 
1314         * @param command
1315         * @return
1316         * @throws JMSException
1317         */
1318        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1319            if (isClosed() || closing.get()) {
1320                throw new ConnectionClosedException();
1321            } else {
1322                return doSyncSendPacket(command, timeout);
1323            }
1324        }
1325    
1326            private Response doSyncSendPacket(Command command, int timeout)
1327                            throws JMSException {
1328                    try {
1329                        Response response = (Response) (timeout > 0
1330                        ? this.transport.request(command, timeout) 
1331                        : this.transport.request(command));
1332                        if (response != null && response.isException()) {
1333                            ExceptionResponse er = (ExceptionResponse)response;
1334                            if (er.getException() instanceof JMSException) {
1335                                throw (JMSException)er.getException();
1336                            } else {
1337                                throw JMSExceptionSupport.create(er.getException());
1338                            }
1339                        }
1340                        return response;
1341                    } catch (IOException e) {
1342                        throw JMSExceptionSupport.create(e);
1343                    }
1344            }
1345    
1346        /**
1347         * @return statistics for this Connection
1348         */
1349        public StatsImpl getStats() {
1350            return stats;
1351        }
1352    
1353        /**
1354         * simply throws an exception if the Connection is already closed or the
1355         * Transport has failed
1356         * 
1357         * @throws JMSException
1358         */
1359        protected synchronized void checkClosedOrFailed() throws JMSException {
1360            checkClosed();
1361            if (transportFailed.get()) {
1362                throw new ConnectionFailedException(firstFailureError);
1363            }
1364        }
1365    
1366        /**
1367         * simply throws an exception if the Connection is already closed
1368         * 
1369         * @throws JMSException
1370         */
1371        protected synchronized void checkClosed() throws JMSException {
1372            if (closed.get()) {
1373                throw new ConnectionClosedException();
1374            }
1375        }
1376    
1377        /**
1378         * Send the ConnectionInfo to the Broker
1379         * 
1380         * @throws JMSException
1381         */
1382        protected void ensureConnectionInfoSent() throws JMSException {
1383            synchronized(this.ensureConnectionInfoSentMutex) {
1384                // Can we skip sending the ConnectionInfo packet??
1385                if (isConnectionInfoSentToBroker || closed.get()) {
1386                    return;
1387                }
1388                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1389                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1390                    info.setClientId(clientIdGenerator.generateId());
1391                }
1392                syncSendPacket(info.copy());
1393        
1394                this.isConnectionInfoSentToBroker = true;
1395                // Add a temp destination advisory consumer so that
1396                // We know what the valid temporary destinations are on the
1397                // broker without having to do an RPC to the broker.
1398        
1399                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1400                if (watchTopicAdvisories) {
1401                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1402                }
1403            }
1404        }
1405    
1406        public synchronized boolean isWatchTopicAdvisories() {
1407            return watchTopicAdvisories;
1408        }
1409    
1410        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1411            this.watchTopicAdvisories = watchTopicAdvisories;
1412        }
1413    
1414        /**
1415         * @return Returns the useAsyncSend.
1416         */
1417        public boolean isUseAsyncSend() {
1418            return useAsyncSend;
1419        }
1420    
1421        /**
1422         * Forces the use of <a
1423         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1424         * adds a massive performance boost; but means that the send() method will
1425         * return immediately whether the message has been sent or not which could
1426         * lead to message loss.
1427         */
1428        public void setUseAsyncSend(boolean useAsyncSend) {
1429            this.useAsyncSend = useAsyncSend;
1430        }
1431    
1432        /**
1433         * @return true if always sync send messages
1434         */
1435        public boolean isAlwaysSyncSend() {
1436            return this.alwaysSyncSend;
1437        }
1438    
1439        /**
1440         * Set true if always require messages to be sync sent
1441         * 
1442         * @param alwaysSyncSend
1443         */
1444        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1445            this.alwaysSyncSend = alwaysSyncSend;
1446        }
1447        
1448        /**
1449         * @return the messagePrioritySupported
1450         */
1451        public boolean isMessagePrioritySupported() {
1452            return this.messagePrioritySupported;
1453        }
1454    
1455        /**
1456         * @param messagePrioritySupported the messagePrioritySupported to set
1457         */
1458        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1459            this.messagePrioritySupported = messagePrioritySupported;
1460        }
1461    
1462        /**
1463         * Cleans up this connection so that it's state is as if the connection was
1464         * just created. This allows the Resource Adapter to clean up a connection
1465         * so that it can be reused without having to close and recreate the
1466         * connection.
1467         */
1468        public void cleanup() throws JMSException {
1469    
1470            if (advisoryConsumer != null && !isTransportFailed()) {
1471                advisoryConsumer.dispose();
1472                advisoryConsumer = null;
1473            }
1474    
1475            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1476                ActiveMQSession s = i.next();
1477                s.dispose();
1478            }
1479            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1480                ActiveMQConnectionConsumer c = i.next();
1481                c.dispose();
1482            }
1483            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1484                ActiveMQInputStream c = i.next();
1485                c.dispose();
1486            }
1487            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1488                ActiveMQOutputStream c = i.next();
1489                c.dispose();
1490            }
1491    
1492            if (isConnectionInfoSentToBroker) {
1493                if (!transportFailed.get() && !closing.get()) {
1494                    syncSendPacket(info.createRemoveCommand());
1495                }
1496                isConnectionInfoSentToBroker = false;
1497            }
1498            if (userSpecifiedClientID) {
1499                info.setClientId(null);
1500                userSpecifiedClientID = false;
1501            }
1502            clientIDSet = false;
1503    
1504            started.set(false);
1505        }
1506    
1507        /**
1508         * Changes the associated username/password that is associated with this
1509         * connection. If the connection has been used, you must called cleanup()
1510         * before calling this method.
1511         * 
1512         * @throws IllegalStateException if the connection is in used.
1513         */
1514        public void changeUserInfo(String userName, String password) throws JMSException {
1515            if (isConnectionInfoSentToBroker) {
1516                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1517            }
1518            this.info.setUserName(userName);
1519            this.info.setPassword(password);
1520        }
1521    
1522        /**
1523         * @return Returns the resourceManagerId.
1524         * @throws JMSException
1525         */
1526        public String getResourceManagerId() throws JMSException {
1527            waitForBrokerInfo();
1528            if (brokerInfo == null) {
1529                throw new JMSException("Connection failed before Broker info was received.");
1530            }
1531            return brokerInfo.getBrokerId().getValue();
1532        }
1533    
1534        /**
1535         * Returns the broker name if one is available or null if one is not
1536         * available yet.
1537         */
1538        public String getBrokerName() {
1539            try {
1540                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1541                if (brokerInfo == null) {
1542                    return null;
1543                }
1544                return brokerInfo.getBrokerName();
1545            } catch (InterruptedException e) {
1546                Thread.currentThread().interrupt();
1547                return null;
1548            }
1549        }
1550    
1551        /**
1552         * Returns the broker information if it is available or null if it is not
1553         * available yet.
1554         */
1555        public BrokerInfo getBrokerInfo() {
1556            return brokerInfo;
1557        }
1558    
1559        /**
1560         * @return Returns the RedeliveryPolicy.
1561         * @throws JMSException
1562         */
1563        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1564            return redeliveryPolicy;
1565        }
1566    
1567        /**
1568         * Sets the redelivery policy to be used when messages are rolled back
1569         */
1570        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1571            this.redeliveryPolicy = redeliveryPolicy;
1572        }
1573    
1574        public BlobTransferPolicy getBlobTransferPolicy() {
1575            if (blobTransferPolicy == null) {
1576                blobTransferPolicy = createBlobTransferPolicy();
1577            }
1578            return blobTransferPolicy;
1579        }
1580    
1581        /**
1582         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1583         * OBjects) are transferred from producers to brokers to consumers
1584         */
1585        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1586            this.blobTransferPolicy = blobTransferPolicy;
1587        }
1588    
1589        /**
1590         * @return Returns the alwaysSessionAsync.
1591         */
1592        public boolean isAlwaysSessionAsync() {
1593            return alwaysSessionAsync;
1594        }
1595    
1596        /**
1597         * If this flag is set then a separate thread is not used for dispatching
1598         * messages for each Session in the Connection. However, a separate thread
1599         * is always used if there is more than one session, or the session isn't in
1600         * auto acknowledge or duplicates ok mode
1601         */
1602        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1603            this.alwaysSessionAsync = alwaysSessionAsync;
1604        }
1605    
1606        /**
1607         * @return Returns the optimizeAcknowledge.
1608         */
1609        public boolean isOptimizeAcknowledge() {
1610            return optimizeAcknowledge;
1611        }
1612    
1613        /**
1614         * Enables an optimised acknowledgement mode where messages are acknowledged
1615         * in batches rather than individually
1616         * 
1617         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1618         */
1619        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1620            this.optimizeAcknowledge = optimizeAcknowledge;
1621        }
1622    
1623        public long getWarnAboutUnstartedConnectionTimeout() {
1624            return warnAboutUnstartedConnectionTimeout;
1625        }
1626    
1627        /**
1628         * Enables the timeout from a connection creation to when a warning is
1629         * generated if the connection is not properly started via {@link #start()}
1630         * and a message is received by a consumer. It is a very common gotcha to
1631         * forget to <a
1632         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1633         * the connection</a> so this option makes the default case to create a
1634         * warning if the user forgets. To disable the warning just set the value to <
1635         * 0 (say -1).
1636         */
1637        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1638            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1639        }
1640        
1641        /**
1642         * @return the sendTimeout
1643         */
1644        public int getSendTimeout() {
1645            return sendTimeout;
1646        }
1647    
1648        /**
1649         * @param sendTimeout the sendTimeout to set
1650         */
1651        public void setSendTimeout(int sendTimeout) {
1652            this.sendTimeout = sendTimeout;
1653        }
1654        
1655        /**
1656         * @return the sendAcksAsync
1657         */
1658        public boolean isSendAcksAsync() {
1659            return sendAcksAsync;
1660        }
1661    
1662        /**
1663         * @param sendAcksAsync the sendAcksAsync to set
1664         */
1665        public void setSendAcksAsync(boolean sendAcksAsync) {
1666            this.sendAcksAsync = sendAcksAsync;
1667        }
1668    
1669    
1670        /**
1671         * Returns the time this connection was created
1672         */
1673        public long getTimeCreated() {
1674            return timeCreated;
1675        }
1676    
1677        private void waitForBrokerInfo() throws JMSException {
1678            try {
1679                brokerInfoReceived.await();
1680            } catch (InterruptedException e) {
1681                Thread.currentThread().interrupt();
1682                throw JMSExceptionSupport.create(e);
1683            }
1684        }
1685    
1686        // Package protected so that it can be used in unit tests
1687        public Transport getTransport() {
1688            return transport;
1689        }
1690    
1691        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1692            producers.put(producerId, producer);
1693        }
1694    
1695        public void removeProducer(ProducerId producerId) {
1696            producers.remove(producerId);
1697        }
1698    
1699        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1700            dispatchers.put(consumerId, dispatcher);
1701        }
1702    
1703        public void removeDispatcher(ConsumerId consumerId) {
1704            dispatchers.remove(consumerId);
1705        }
1706    
1707        /**
1708         * @param o - the command to consume
1709         */
1710        public void onCommand(final Object o) {
1711            final Command command = (Command)o;
1712            if (!closed.get() && command != null) {
1713                try {
1714                    command.visit(new CommandVisitorAdapter() {
1715                        @Override
1716                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1717                            waitForTransportInterruptionProcessingToComplete();
1718                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1719                            if (dispatcher != null) {
1720                                // Copy in case a embedded broker is dispatching via
1721                                // vm://
1722                                // md.getMessage() == null to signal end of queue
1723                                // browse.
1724                                Message msg = md.getMessage();
1725                                if (msg != null) {
1726                                    msg = msg.copy();
1727                                    msg.setReadOnlyBody(true);
1728                                    msg.setReadOnlyProperties(true);
1729                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1730                                    msg.setConnection(ActiveMQConnection.this);
1731                                    md.setMessage(msg);
1732                                }
1733                                dispatcher.dispatch(md);
1734                            }
1735                            return null;
1736                        }
1737    
1738                        @Override
1739                        public Response processProducerAck(ProducerAck pa) throws Exception {
1740                            if (pa != null && pa.getProducerId() != null) {
1741                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1742                                if (producer != null) {
1743                                    producer.onProducerAck(pa);
1744                                }
1745                            }
1746                            return null;
1747                        }
1748    
1749                        @Override
1750                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1751                            brokerInfo = info;
1752                            brokerInfoReceived.countDown();
1753                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1754                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1755                            return null;
1756                        }
1757    
1758                        @Override
1759                        public Response processConnectionError(final ConnectionError error) throws Exception {
1760                            executor.execute(new Runnable() {
1761                                public void run() {
1762                                    onAsyncException(error.getException());
1763                                }
1764                            });
1765                            return null;
1766                        }
1767    
1768                        @Override
1769                        public Response processControlCommand(ControlCommand command) throws Exception {
1770                            onControlCommand(command);
1771                            return null;
1772                        }
1773    
1774                        @Override
1775                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1776                            onConnectionControl((ConnectionControl)command);
1777                            return null;
1778                        }
1779    
1780                        @Override
1781                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1782                            onConsumerControl((ConsumerControl)command);
1783                            return null;
1784                        }
1785    
1786                        @Override
1787                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1788                            onWireFormatInfo((WireFormatInfo)command);
1789                            return null;
1790                        }
1791                    });
1792                } catch (Exception e) {
1793                    onClientInternalException(e);
1794                }
1795    
1796            }
1797            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1798                TransportListener listener = iter.next();
1799                listener.onCommand(command);
1800            }
1801        }
1802    
1803        protected void onWireFormatInfo(WireFormatInfo info) {
1804            protocolVersion.set(info.getVersion());
1805        }
1806    
1807        /**
1808         * Handles async client internal exceptions.
1809         * A client internal exception is usually one that has been thrown
1810         * by a container runtime component during asynchronous processing of a
1811         * message that does not affect the connection itself.
1812         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1813         * its <code>onException</code> method, if one has been registered with this connection.
1814         * 
1815         * @param error the exception that the problem
1816         */
1817        public void onClientInternalException(final Throwable error) {
1818            if ( !closed.get() && !closing.get() ) {
1819                if ( this.clientInternalExceptionListener != null ) {
1820                    executor.execute(new Runnable() {
1821                        public void run() {
1822                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1823                        }
1824                    });
1825                } else {
1826                    LOG.debug("Async client internal exception occurred with no exception listener registered: " 
1827                            + error, error);
1828                }
1829            }
1830        }
1831        /**
1832         * Used for handling async exceptions
1833         * 
1834         * @param error
1835         */
1836        public void onAsyncException(Throwable error) {
1837            if (!closed.get() && !closing.get()) {
1838                if (this.exceptionListener != null) {
1839    
1840                    if (!(error instanceof JMSException)) {
1841                        error = JMSExceptionSupport.create(error);
1842                    }
1843                    final JMSException e = (JMSException)error;
1844    
1845                    executor.execute(new Runnable() {
1846                        public void run() {
1847                            ActiveMQConnection.this.exceptionListener.onException(e);
1848                        }
1849                    });
1850    
1851                } else {
1852                    LOG.debug("Async exception with no exception listener: " + error, error);
1853                }
1854            }
1855        }
1856    
1857        public void onException(final IOException error) {
1858                    onAsyncException(error);
1859                    if (!closing.get() && !closed.get()) {
1860                            executor.execute(new Runnable() {
1861                                    public void run() {
1862                                            transportFailed(error);
1863                                            ServiceSupport.dispose(ActiveMQConnection.this.transport);
1864                                            brokerInfoReceived.countDown();
1865                                            try {
1866                                                    cleanup();
1867                                            } catch (JMSException e) {
1868                                                    LOG.warn("Exception during connection cleanup, " + e, e);
1869                                            }
1870                                            for (Iterator<TransportListener> iter = transportListeners
1871                                                            .iterator(); iter.hasNext();) {
1872                                                    TransportListener listener = iter.next();
1873                                                    listener.onException(error);
1874                                            }
1875                                    }
1876                            });
1877                    }
1878            }
1879    
1880        public void transportInterupted() {
1881            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1882            if (LOG.isDebugEnabled()) {
1883                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1884            }
1885            signalInterruptionProcessingNeeded();
1886    
1887            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1888                ActiveMQSession s = i.next();
1889                s.clearMessagesInProgress();
1890            }
1891            
1892            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1893                connectionConsumer.clearMessagesInProgress();    
1894            }
1895            
1896            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1897                TransportListener listener = iter.next();
1898                listener.transportInterupted();
1899            }
1900        }
1901    
1902        public void transportResumed() {
1903            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1904                TransportListener listener = iter.next();
1905                listener.transportResumed();
1906            }
1907        }
1908    
1909        /**
1910         * Create the DestinationInfo object for the temporary destination.
1911         * 
1912         * @param topic - if its true topic, else queue.
1913         * @return DestinationInfo
1914         * @throws JMSException
1915         */
1916        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1917    
1918            // Check if Destination info is of temporary type.
1919            ActiveMQTempDestination dest;
1920            if (topic) {
1921                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1922            } else {
1923                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1924            }
1925    
1926            DestinationInfo info = new DestinationInfo();
1927            info.setConnectionId(this.info.getConnectionId());
1928            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1929            info.setDestination(dest);
1930            syncSendPacket(info);
1931    
1932            dest.setConnection(this);
1933            activeTempDestinations.put(dest, dest);
1934            return dest;
1935        }
1936    
1937        /**
1938         * @param destination
1939         * @throws JMSException
1940         */
1941        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1942    
1943            checkClosedOrFailed();
1944    
1945            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1946                ActiveMQSession s = i.next();
1947                if (s.isInUse(destination)) {
1948                    throw new JMSException("A consumer is consuming from the temporary destination");
1949                }
1950            }
1951    
1952            activeTempDestinations.remove(destination);
1953    
1954            DestinationInfo destInfo = new DestinationInfo();
1955            destInfo.setConnectionId(this.info.getConnectionId());
1956            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1957            destInfo.setDestination(destination);
1958            destInfo.setTimeout(0);
1959            syncSendPacket(destInfo);
1960        }
1961    
1962        public boolean isDeleted(ActiveMQDestination dest) {
1963    
1964            // If we are not watching the advisories.. then
1965            // we will assume that the temp destination does exist.
1966            if (advisoryConsumer == null) {
1967                return false;
1968            }
1969    
1970            return !activeTempDestinations.contains(dest);
1971        }
1972    
1973        public boolean isCopyMessageOnSend() {
1974            return copyMessageOnSend;
1975        }
1976    
1977        public LongSequenceGenerator getLocalTransactionIdGenerator() {
1978            return localTransactionIdGenerator;
1979        }
1980    
1981        public boolean isUseCompression() {
1982            return useCompression;
1983        }
1984    
1985        /**
1986         * Enables the use of compression of the message bodies
1987         */
1988        public void setUseCompression(boolean useCompression) {
1989            this.useCompression = useCompression;
1990        }
1991    
1992        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
1993    
1994            checkClosedOrFailed();
1995            ensureConnectionInfoSent();
1996    
1997            DestinationInfo info = new DestinationInfo();
1998            info.setConnectionId(this.info.getConnectionId());
1999            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2000            info.setDestination(destination);
2001            info.setTimeout(0);
2002            syncSendPacket(info);
2003    
2004        }
2005    
2006        public boolean isDispatchAsync() {
2007            return dispatchAsync;
2008        }
2009    
2010        /**
2011         * Enables or disables the default setting of whether or not consumers have
2012         * their messages <a
2013         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2014         * synchronously or asynchronously by the broker</a>. For non-durable
2015         * topics for example we typically dispatch synchronously by default to
2016         * minimize context switches which boost performance. However sometimes its
2017         * better to go slower to ensure that a single blocked consumer socket does
2018         * not block delivery to other consumers.
2019         * 
2020         * @param asyncDispatch If true then consumers created on this connection
2021         *                will default to having their messages dispatched
2022         *                asynchronously. The default value is false.
2023         */
2024        public void setDispatchAsync(boolean asyncDispatch) {
2025            this.dispatchAsync = asyncDispatch;
2026        }
2027    
2028        public boolean isObjectMessageSerializationDefered() {
2029            return objectMessageSerializationDefered;
2030        }
2031    
2032        /**
2033         * When an object is set on an ObjectMessage, the JMS spec requires the
2034         * object to be serialized by that set method. Enabling this flag causes the
2035         * object to not get serialized. The object may subsequently get serialized
2036         * if the message needs to be sent over a socket or stored to disk.
2037         */
2038        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2039            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2040        }
2041    
2042        public InputStream createInputStream(Destination dest) throws JMSException {
2043            return createInputStream(dest, null);
2044        }
2045    
2046        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2047            return createInputStream(dest, messageSelector, false);
2048        }
2049    
2050        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2051            return createInputStream(dest, messageSelector, noLocal,  -1);
2052        }
2053    
2054    
2055    
2056        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2057            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2058        }
2059        
2060        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2061            return createInputStream(dest, null, false);
2062        }
2063    
2064        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2065            return createDurableInputStream(dest, name, messageSelector, false);
2066        }
2067    
2068        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2069            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2070        }
2071    
2072        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2073            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2074        }
2075        
2076        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2077            checkClosedOrFailed();
2078            ensureConnectionInfoSent();
2079            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2080        }
2081    
2082        /**
2083         * Creates a persistent output stream; individual messages will be written
2084         * to disk/database by the broker
2085         */
2086        public OutputStream createOutputStream(Destination dest) throws JMSException {
2087            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2088        }
2089    
2090        /**
2091         * Creates a non persistent output stream; messages will not be written to
2092         * disk
2093         */
2094        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2095            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2096        }
2097    
2098        /**
2099         * Creates an output stream allowing full control over the delivery mode,
2100         * the priority and time to live of the messages and the properties added to
2101         * messages on the stream.
2102         * 
2103         * @param streamProperties defines a map of key-value pairs where the keys
2104         *                are strings and the values are primitive values (numbers
2105         *                and strings) which are appended to the messages similarly
2106         *                to using the
2107         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2108         *                method
2109         */
2110        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2111            checkClosedOrFailed();
2112            ensureConnectionInfoSent();
2113            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2114        }
2115    
2116        /**
2117         * Unsubscribes a durable subscription that has been created by a client.
2118         * <P>
2119         * This method deletes the state being maintained on behalf of the
2120         * subscriber by its provider.
2121         * <P>
2122         * It is erroneous for a client to delete a durable subscription while there
2123         * is an active <CODE>MessageConsumer </CODE> or
2124         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2125         * message is part of a pending transaction or has not been acknowledged in
2126         * the session.
2127         * 
2128         * @param name the name used to identify this subscription
2129         * @throws JMSException if the session fails to unsubscribe to the durable
2130         *                 subscription due to some internal error.
2131         * @throws InvalidDestinationException if an invalid subscription name is
2132         *                 specified.
2133         * @since 1.1
2134         */
2135        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2136            checkClosedOrFailed();
2137            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2138            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2139            rsi.setSubscriptionName(name);
2140            rsi.setClientId(getConnectionInfo().getClientId());
2141            syncSendPacket(rsi);
2142        }
2143    
2144        /**
2145         * Internal send method optimized: - It does not copy the message - It can
2146         * only handle ActiveMQ messages. - You can specify if the send is async or
2147         * sync - Does not allow you to send /w a transaction.
2148         */
2149        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2150            checkClosedOrFailed();
2151    
2152            if (destination.isTemporary() && isDeleted(destination)) {
2153                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2154            }
2155    
2156            msg.setJMSDestination(destination);
2157            msg.setJMSDeliveryMode(deliveryMode);
2158            long expiration = 0L;
2159    
2160            if (!isDisableTimeStampsByDefault()) {
2161                long timeStamp = System.currentTimeMillis();
2162                msg.setJMSTimestamp(timeStamp);
2163                if (timeToLive > 0) {
2164                    expiration = timeToLive + timeStamp;
2165                }
2166            }
2167    
2168            msg.setJMSExpiration(expiration);
2169            msg.setJMSPriority(priority);
2170    
2171            msg.setJMSRedelivered(false);
2172            msg.setMessageId(messageId);
2173    
2174            msg.onSend();
2175    
2176            msg.setProducerId(msg.getMessageId().getProducerId());
2177    
2178            if (LOG.isDebugEnabled()) {
2179                LOG.debug("Sending message: " + msg);
2180            }
2181    
2182            if (async) {
2183                asyncSendPacket(msg);
2184            } else {
2185                syncSendPacket(msg);
2186            }
2187    
2188        }
2189    
2190        public void addOutputStream(ActiveMQOutputStream stream) {
2191            outputStreams.add(stream);
2192        }
2193    
2194        public void removeOutputStream(ActiveMQOutputStream stream) {
2195            outputStreams.remove(stream);
2196        }
2197    
2198        public void addInputStream(ActiveMQInputStream stream) {
2199            inputStreams.add(stream);
2200        }
2201    
2202        public void removeInputStream(ActiveMQInputStream stream) {
2203            inputStreams.remove(stream);
2204        }
2205    
2206        protected void onControlCommand(ControlCommand command) {
2207            String text = command.getCommand();
2208            if (text != null) {
2209                if (text.equals("shutdown")) {
2210                    LOG.info("JVM told to shutdown");
2211                    System.exit(0);
2212                }
2213            }
2214        }
2215    
2216        protected void onConnectionControl(ConnectionControl command) {
2217            if (command.isFaultTolerant()) {
2218                this.optimizeAcknowledge = false;
2219                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2220                    ActiveMQSession s = i.next();
2221                    s.setOptimizeAcknowledge(false);
2222                }
2223            }
2224        }
2225    
2226        protected void onConsumerControl(ConsumerControl command) {
2227            if (command.isClose()) {
2228                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2229                    ActiveMQSession s = i.next();
2230                    s.close(command.getConsumerId());
2231                }
2232            } else {
2233                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2234                    ActiveMQSession s = i.next();
2235                    s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2236                }
2237            }
2238        }
2239    
2240        protected void transportFailed(IOException error) {
2241            transportFailed.set(true);
2242            if (firstFailureError == null) {
2243                firstFailureError = error;
2244            }
2245        }
2246    
2247        /**
2248         * Should a JMS message be copied to a new JMS Message object as part of the
2249         * send() method in JMS. This is enabled by default to be compliant with the
2250         * JMS specification. You can disable it if you do not mutate JMS messages
2251         * after they are sent for a performance boost
2252         */
2253        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2254            this.copyMessageOnSend = copyMessageOnSend;
2255        }
2256    
2257        @Override
2258        public String toString() {
2259            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2260        }
2261    
2262        protected BlobTransferPolicy createBlobTransferPolicy() {
2263            return new BlobTransferPolicy();
2264        }
2265    
2266        public int getProtocolVersion() {
2267            return protocolVersion.get();
2268        }
2269    
2270        public int getProducerWindowSize() {
2271            return producerWindowSize;
2272        }
2273    
2274        public void setProducerWindowSize(int producerWindowSize) {
2275            this.producerWindowSize = producerWindowSize;
2276        }
2277    
2278        public void setAuditDepth(int auditDepth) {
2279            connectionAudit.setAuditDepth(auditDepth);
2280            }
2281    
2282        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2283            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2284            }
2285    
2286        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2287            connectionAudit.removeDispatcher(dispatcher);
2288        }
2289    
2290        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2291            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2292        }
2293    
2294        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2295            connectionAudit.rollbackDuplicate(dispatcher, message);
2296        }
2297    
2298            public IOException getFirstFailureError() {
2299                    return firstFailureError;
2300            }
2301            
2302            protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2303                CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2304                if (cdl != null) {
2305                if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2306                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2307                    cdl.await(10, TimeUnit.SECONDS);
2308                }
2309                signalInterruptionProcessingComplete();
2310            }
2311        }
2312            
2313            protected void transportInterruptionProcessingComplete() {
2314                CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2315                if (cdl != null) {
2316                    cdl.countDown();
2317                    try {
2318                        signalInterruptionProcessingComplete();
2319                    } catch (InterruptedException ignored) {}
2320                }
2321            }
2322    
2323        private void signalInterruptionProcessingComplete() throws InterruptedException {
2324            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2325            if (cdl.getCount()==0) {
2326                if (LOG.isDebugEnabled()) {
2327                    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2328                }
2329                this.transportInterruptionProcessingComplete = null;
2330    
2331                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2332                if (failoverTransport != null) {
2333                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2334                    if (LOG.isDebugEnabled()) {
2335                        LOG.debug("notified failover transport (" + failoverTransport
2336                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2337                    }
2338                }
2339    
2340            }
2341        }
2342    
2343        private void signalInterruptionProcessingNeeded() {
2344            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2345            if (failoverTransport != null) {
2346                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2347                if (LOG.isDebugEnabled()) {
2348                    LOG.debug("notified failover transport (" + failoverTransport
2349                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2350                }
2351            }
2352        }
2353    
2354        /*
2355         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2356         * will wait to receive re dispatched messages.
2357         * default value is 0 so there is no wait by default.
2358         */
2359        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2360            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2361        }
2362        
2363        public long getConsumerFailoverRedeliveryWaitPeriod() {
2364            return consumerFailoverRedeliveryWaitPeriod;
2365        }
2366        
2367        protected Scheduler getScheduler() {
2368            return this.scheduler;
2369        }
2370        
2371        protected ThreadPoolExecutor getExecutor() {
2372            return this.executor;
2373        }
2374    
2375        /**
2376         * @return the checkForDuplicates
2377         */
2378        public boolean isCheckForDuplicates() {
2379            return this.checkForDuplicates;
2380        }
2381    
2382        /**
2383         * @param checkForDuplicates the checkForDuplicates to set
2384         */
2385        public void setCheckForDuplicates(boolean checkForDuplicates) {
2386            this.checkForDuplicates = checkForDuplicates;
2387        }
2388    
2389    }