001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Properties;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    import java.util.concurrent.atomic.AtomicReference;
034    import java.util.concurrent.locks.ReentrantReadWriteLock;
035    
036    import javax.management.ObjectName;
037    import javax.transaction.xa.XAResource;
038    
039    import org.apache.activemq.broker.ft.MasterBroker;
040    import org.apache.activemq.broker.region.ConnectionStatistics;
041    import org.apache.activemq.broker.region.RegionBroker;
042    import org.apache.activemq.command.BrokerId;
043    import org.apache.activemq.command.BrokerInfo;
044    import org.apache.activemq.command.Command;
045    import org.apache.activemq.command.CommandTypes;
046    import org.apache.activemq.command.ConnectionControl;
047    import org.apache.activemq.command.ConnectionError;
048    import org.apache.activemq.command.ConnectionId;
049    import org.apache.activemq.command.ConnectionInfo;
050    import org.apache.activemq.command.ConsumerControl;
051    import org.apache.activemq.command.ConsumerId;
052    import org.apache.activemq.command.ConsumerInfo;
053    import org.apache.activemq.command.ControlCommand;
054    import org.apache.activemq.command.DataArrayResponse;
055    import org.apache.activemq.command.DestinationInfo;
056    import org.apache.activemq.command.ExceptionResponse;
057    import org.apache.activemq.command.FlushCommand;
058    import org.apache.activemq.command.IntegerResponse;
059    import org.apache.activemq.command.KeepAliveInfo;
060    import org.apache.activemq.command.Message;
061    import org.apache.activemq.command.MessageAck;
062    import org.apache.activemq.command.MessageDispatch;
063    import org.apache.activemq.command.MessageDispatchNotification;
064    import org.apache.activemq.command.MessagePull;
065    import org.apache.activemq.command.ProducerAck;
066    import org.apache.activemq.command.ProducerId;
067    import org.apache.activemq.command.ProducerInfo;
068    import org.apache.activemq.command.RemoveSubscriptionInfo;
069    import org.apache.activemq.command.Response;
070    import org.apache.activemq.command.SessionId;
071    import org.apache.activemq.command.SessionInfo;
072    import org.apache.activemq.command.ShutdownInfo;
073    import org.apache.activemq.command.TransactionId;
074    import org.apache.activemq.command.TransactionInfo;
075    import org.apache.activemq.command.WireFormatInfo;
076    import org.apache.activemq.network.*;
077    import org.apache.activemq.security.MessageAuthorizationPolicy;
078    import org.apache.activemq.state.CommandVisitor;
079    import org.apache.activemq.state.ConnectionState;
080    import org.apache.activemq.state.ConsumerState;
081    import org.apache.activemq.state.ProducerState;
082    import org.apache.activemq.state.SessionState;
083    import org.apache.activemq.state.TransactionState;
084    import org.apache.activemq.thread.DefaultThreadPools;
085    import org.apache.activemq.thread.Task;
086    import org.apache.activemq.thread.TaskRunner;
087    import org.apache.activemq.thread.TaskRunnerFactory;
088    import org.apache.activemq.transaction.Transaction;
089    import org.apache.activemq.transport.DefaultTransportListener;
090    import org.apache.activemq.transport.ResponseCorrelator;
091    import org.apache.activemq.transport.Transport;
092    import org.apache.activemq.transport.TransportDisposedIOException;
093    import org.apache.activemq.transport.TransportFactory;
094    import org.apache.activemq.util.*;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    import org.slf4j.MDC;
098    
099    public class TransportConnection implements Connection, Task, CommandVisitor {
100        private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
101        private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
102        private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
103        // Keeps track of the broker and connector that created this connection.
104        protected final Broker broker;
105        protected final TransportConnector connector;
106        // Keeps track of the state of the connections.
107        // protected final ConcurrentHashMap localConnectionStates=new
108        // ConcurrentHashMap();
109        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
110        // The broker and wireformat info that was exchanged.
111        protected BrokerInfo brokerInfo;
112        protected final List<Command> dispatchQueue = new LinkedList<Command>();
113        protected TaskRunner taskRunner;
114        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
115        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
116        private MasterBroker masterBroker;
117        private final Transport transport;
118        private MessageAuthorizationPolicy messageAuthorizationPolicy;
119        private WireFormatInfo wireFormatInfo;
120        // Used to do async dispatch.. this should perhaps be pushed down into the
121        // transport layer..
122        private boolean inServiceException;
123        private final ConnectionStatistics statistics = new ConnectionStatistics();
124        private boolean manageable;
125        private boolean slow;
126        private boolean markedCandidate;
127        private boolean blockedCandidate;
128        private boolean blocked;
129        private boolean connected;
130        private boolean active;
131        private boolean starting;
132        private boolean pendingStop;
133        private long timeStamp;
134        private final AtomicBoolean stopping = new AtomicBoolean(false);
135        private final CountDownLatch stopped = new CountDownLatch(1);
136        private final AtomicBoolean asyncException = new AtomicBoolean(false);
137        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
138        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
139        private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
140        private ConnectionContext context;
141        private boolean networkConnection;
142        private boolean faultTolerantConnection;
143        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
144        private DemandForwardingBridge duplexBridge;
145        private final TaskRunnerFactory taskRunnerFactory;
146        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
147        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
148        private String duplexNetworkConnectorId;
149    
150        /**
151         * @param connector
152         * @param transport
153         * @param broker
154         * @param taskRunnerFactory
155         *            - can be null if you want direct dispatch to the transport
156         *            else commands are sent async.
157         */
158        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
159                TaskRunnerFactory taskRunnerFactory) {
160            this.connector = connector;
161            this.broker = broker;
162            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
163            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
164            brokerConnectionStates = rb.getConnectionStates();
165            if (connector != null) {
166                this.statistics.setParent(connector.getStatistics());
167            }
168            this.taskRunnerFactory = taskRunnerFactory;
169            this.transport = transport;
170            this.transport.setTransportListener(new DefaultTransportListener() {
171                @Override
172                public void onCommand(Object o) {
173                    serviceLock.readLock().lock();
174                    try {
175                        if (!(o instanceof Command)) {
176                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
177                        }
178                        Command command = (Command) o;
179                        Response response = service(command);
180                        if (response != null) {
181                            dispatchSync(response);
182                        }
183                    } finally {
184                        serviceLock.readLock().unlock();
185                    }
186                }
187    
188                @Override
189                public void onException(IOException exception) {
190                    serviceLock.readLock().lock();
191                    try {
192                        serviceTransportException(exception);
193                    } finally {
194                        serviceLock.readLock().unlock();
195                    }
196                }
197            });
198            connected = true;
199        }
200    
201        /**
202         * Returns the number of messages to be dispatched to this connection
203         * 
204         * @return size of dispatch queue
205         */
206        public int getDispatchQueueSize() {
207            synchronized (dispatchQueue) {
208                return dispatchQueue.size();
209            }
210        }
211    
212        public void serviceTransportException(IOException e) {
213            BrokerService bService = connector.getBrokerService();
214            if (bService.isShutdownOnSlaveFailure()) {
215                if (brokerInfo != null) {
216                    if (brokerInfo.isSlaveBroker()) {
217                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
218                        try {
219                            doStop();
220                            bService.stop();
221                        } catch (Exception ex) {
222                            LOG.warn("Failed to stop the master", ex);
223                        }
224                    }
225                }
226            }
227            if (!stopping.get()) {
228                transportException.set(e);
229                if (TRANSPORTLOG.isDebugEnabled()) {
230                    TRANSPORTLOG.debug("Transport failed: " + e, e);
231                } else if (TRANSPORTLOG.isInfoEnabled()) {
232                    TRANSPORTLOG.info("Transport failed: " + e);
233                }
234                stopAsync();
235            }
236        }
237    
238        /**
239         * Calls the serviceException method in an async thread. Since handling a
240         * service exception closes a socket, we should not tie up broker threads
241         * since client sockets may hang or cause deadlocks.
242         * 
243         * @param e
244         */
245        public void serviceExceptionAsync(final IOException e) {
246            if (asyncException.compareAndSet(false, true)) {
247                new Thread("Async Exception Handler") {
248                    @Override
249                    public void run() {
250                        serviceException(e);
251                    }
252                }.start();
253            }
254        }
255    
256        /**
257         * Closes a clients connection due to a detected error. Errors are ignored
258         * if: the client is closing or broker is closing. Otherwise, the connection
259         * error transmitted to the client before stopping it's transport.
260         */
261        public void serviceException(Throwable e) {
262            // are we a transport exception such as not being able to dispatch
263            // synchronously to a transport
264            if (e instanceof IOException) {
265                serviceTransportException((IOException) e);
266            } else if (e.getClass() == BrokerStoppedException.class) {
267                // Handle the case where the broker is stopped
268                // But the client is still connected.
269                if (!stopping.get()) {
270                    if (SERVICELOG.isDebugEnabled()) {
271                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
272                    }
273                    ConnectionError ce = new ConnectionError();
274                    ce.setException(e);
275                    dispatchSync(ce);
276                    // Wait a little bit to try to get the output buffer to flush
277                    // the exption notification to the client.
278                    try {
279                        Thread.sleep(500);
280                    } catch (InterruptedException ie) {
281                        Thread.currentThread().interrupt();
282                    }
283                    // Worst case is we just kill the connection before the
284                    // notification gets to him.
285                    stopAsync();
286                }
287            } else if (!stopping.get() && !inServiceException) {
288                inServiceException = true;
289                try {
290                    SERVICELOG.warn("Async error occurred: " + e, e);
291                    ConnectionError ce = new ConnectionError();
292                    ce.setException(e);
293                    dispatchAsync(ce);
294                } finally {
295                    inServiceException = false;
296                }
297            }
298        }
299    
300        public Response service(Command command) {
301            MDC.put("activemq.connector", connector.getUri().toString());
302            Response response = null;
303            boolean responseRequired = command.isResponseRequired();
304            int commandId = command.getCommandId();
305            try {
306                response = command.visit(this);
307            } catch (Throwable e) {
308                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
309                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
310                            + " command: " + command + ", exception: " + e, e);
311                }
312                if (responseRequired) {
313                    response = new ExceptionResponse(e);
314                } else {
315                    serviceException(e);
316                }
317            }
318            if (responseRequired) {
319                if (response == null) {
320                    response = new Response();
321                }
322                response.setCorrelationId(commandId);
323            }
324            // The context may have been flagged so that the response is not
325            // sent.
326            if (context != null) {
327                if (context.isDontSendReponse()) {
328                    context.setDontSendReponse(false);
329                    response = null;
330                }
331                context = null;
332            }
333            MDC.remove("activemq.connector");
334            return response;
335        }
336    
337        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
338            return null;
339        }
340    
341        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
342            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
343            return null;
344        }
345    
346        public Response processWireFormat(WireFormatInfo info) throws Exception {
347            wireFormatInfo = info;
348            protocolVersion.set(info.getVersion());
349            return null;
350        }
351    
352        public Response processShutdown(ShutdownInfo info) throws Exception {
353            stopAsync();
354            return null;
355        }
356    
357        public Response processFlush(FlushCommand command) throws Exception {
358            return null;
359        }
360    
361        public Response processBeginTransaction(TransactionInfo info) throws Exception {
362            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
363            context = null;
364            if (cs != null) {
365                context = cs.getContext();
366            }
367            if (cs == null) {
368                throw new NullPointerException("Context is null");
369            }
370            // Avoid replaying dup commands
371            if (cs.getTransactionState(info.getTransactionId()) == null) {
372                cs.addTransactionState(info.getTransactionId());
373                broker.beginTransaction(context, info.getTransactionId());
374            }
375            return null;
376        }
377    
378        public Response processEndTransaction(TransactionInfo info) throws Exception {
379            // No need to do anything. This packet is just sent by the client
380            // make sure he is synced with the server as commit command could
381            // come from a different connection.
382            return null;
383        }
384    
385        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
386            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
387            context = null;
388            if (cs != null) {
389                context = cs.getContext();
390            }
391            if (cs == null) {
392                throw new NullPointerException("Context is null");
393            }
394            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
395            if (transactionState == null) {
396                throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
397                        + info.getTransactionId());
398            }
399            // Avoid dups.
400            if (!transactionState.isPrepared()) {
401                transactionState.setPrepared(true);
402                int result = broker.prepareTransaction(context, info.getTransactionId());
403                transactionState.setPreparedResult(result);
404                if (result == XAResource.XA_RDONLY) {
405                    // we are done, no further rollback or commit from TM
406                    cs.removeTransactionState(info.getTransactionId());
407                }
408                IntegerResponse response = new IntegerResponse(result);
409                return response;
410            } else {
411                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
412                return response;
413            }
414        }
415    
416        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
417            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
418            context = cs.getContext();
419            cs.removeTransactionState(info.getTransactionId());
420            broker.commitTransaction(context, info.getTransactionId(), true);
421            return null;
422        }
423    
424        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
425            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
426            context = cs.getContext();
427            cs.removeTransactionState(info.getTransactionId());
428            broker.commitTransaction(context, info.getTransactionId(), false);
429            return null;
430        }
431    
432        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
433            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
434            context = cs.getContext();
435            cs.removeTransactionState(info.getTransactionId());
436            broker.rollbackTransaction(context, info.getTransactionId());
437            return null;
438        }
439    
440        public Response processForgetTransaction(TransactionInfo info) throws Exception {
441            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
442            context = cs.getContext();
443            broker.forgetTransaction(context, info.getTransactionId());
444            return null;
445        }
446    
447        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
448            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
449            context = cs.getContext();
450            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
451            return new DataArrayResponse(preparedTransactions);
452        }
453    
454        public Response processMessage(Message messageSend) throws Exception {
455            ProducerId producerId = messageSend.getProducerId();
456            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
457            if (producerExchange.canDispatch(messageSend)) {
458                broker.send(producerExchange, messageSend);
459            }
460            return null;
461        }
462    
463        public Response processMessageAck(MessageAck ack) throws Exception {
464            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
465            broker.acknowledge(consumerExchange, ack);
466            return null;
467        }
468    
469        public Response processMessagePull(MessagePull pull) throws Exception {
470            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
471        }
472    
473        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
474            broker.processDispatchNotification(notification);
475            return null;
476        }
477    
478        public Response processAddDestination(DestinationInfo info) throws Exception {
479            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
480            broker.addDestinationInfo(cs.getContext(), info);
481            if (info.getDestination().isTemporary()) {
482                cs.addTempDestination(info);
483            }
484            return null;
485        }
486    
487        public Response processRemoveDestination(DestinationInfo info) throws Exception {
488            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
489            broker.removeDestinationInfo(cs.getContext(), info);
490            if (info.getDestination().isTemporary()) {
491                cs.removeTempDestination(info.getDestination());
492            }
493            return null;
494        }
495    
496        public Response processAddProducer(ProducerInfo info) throws Exception {
497            SessionId sessionId = info.getProducerId().getParentId();
498            ConnectionId connectionId = sessionId.getParentId();
499            TransportConnectionState cs = lookupConnectionState(connectionId);
500            SessionState ss = cs.getSessionState(sessionId);
501            if (ss == null) {
502                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
503                        + sessionId);
504            }
505            // Avoid replaying dup commands
506            if (!ss.getProducerIds().contains(info.getProducerId())) {
507                broker.addProducer(cs.getContext(), info);
508                try {
509                    ss.addProducer(info);
510                } catch (IllegalStateException e) {
511                    broker.removeProducer(cs.getContext(), info);
512                }
513            }
514            return null;
515        }
516    
517        public Response processRemoveProducer(ProducerId id) throws Exception {
518            SessionId sessionId = id.getParentId();
519            ConnectionId connectionId = sessionId.getParentId();
520            TransportConnectionState cs = lookupConnectionState(connectionId);
521            SessionState ss = cs.getSessionState(sessionId);
522            if (ss == null) {
523                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
524                        + sessionId);
525            }
526            ProducerState ps = ss.removeProducer(id);
527            if (ps == null) {
528                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
529            }
530            removeProducerBrokerExchange(id);
531            broker.removeProducer(cs.getContext(), ps.getInfo());
532            return null;
533        }
534    
535        public Response processAddConsumer(ConsumerInfo info) throws Exception {
536            SessionId sessionId = info.getConsumerId().getParentId();
537            ConnectionId connectionId = sessionId.getParentId();
538            TransportConnectionState cs = lookupConnectionState(connectionId);
539            SessionState ss = cs.getSessionState(sessionId);
540            if (ss == null) {
541                throw new IllegalStateException(broker.getBrokerName()
542                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
543            }
544            // Avoid replaying dup commands
545            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
546                broker.addConsumer(cs.getContext(), info);
547                try {
548                    ss.addConsumer(info);
549                } catch (IllegalStateException e) {
550                    broker.removeConsumer(cs.getContext(), info);
551                }
552            }
553            return null;
554        }
555    
556        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
557            SessionId sessionId = id.getParentId();
558            ConnectionId connectionId = sessionId.getParentId();
559            TransportConnectionState cs = lookupConnectionState(connectionId);
560            if (cs == null) {
561                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
562                        + connectionId);
563            }
564            SessionState ss = cs.getSessionState(sessionId);
565            if (ss == null) {
566                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
567                        + sessionId);
568            }
569            ConsumerState consumerState = ss.removeConsumer(id);
570            if (consumerState == null) {
571                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
572            }
573            ConsumerInfo info = consumerState.getInfo();
574            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
575            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
576            removeConsumerBrokerExchange(id);
577            return null;
578        }
579    
580        public Response processAddSession(SessionInfo info) throws Exception {
581            ConnectionId connectionId = info.getSessionId().getParentId();
582            TransportConnectionState cs = lookupConnectionState(connectionId);
583            // Avoid replaying dup commands
584            if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
585                broker.addSession(cs.getContext(), info);
586                try {
587                    cs.addSession(info);
588                } catch (IllegalStateException e) {
589                    e.printStackTrace();
590                    broker.removeSession(cs.getContext(), info);
591                }
592            }
593            return null;
594        }
595    
596        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
597            ConnectionId connectionId = id.getParentId();
598            TransportConnectionState cs = lookupConnectionState(connectionId);
599            if (cs == null) {
600                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
601            }
602            SessionState session = cs.getSessionState(id);
603            if (session == null) {
604                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
605            }
606            // Don't let new consumers or producers get added while we are closing
607            // this down.
608            session.shutdown();
609            // Cascade the connection stop to the consumers and producers.
610            for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
611                ConsumerId consumerId = (ConsumerId) iter.next();
612                try {
613                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
614                } catch (Throwable e) {
615                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
616                }
617            }
618            for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
619                ProducerId producerId = (ProducerId) iter.next();
620                try {
621                    processRemoveProducer(producerId);
622                } catch (Throwable e) {
623                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
624                }
625            }
626            cs.removeSession(id);
627            broker.removeSession(cs.getContext(), session.getInfo());
628            return null;
629        }
630    
631        public Response processAddConnection(ConnectionInfo info) throws Exception {
632            // if the broker service has slave attached, wait for the slave to be
633            // attached to allow client connection. slave connection is fine
634            if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
635                    && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
636                ServiceSupport.dispose(transport);
637                return new ExceptionResponse(new Exception("Master's slave not attached yet."));
638            }
639            // Older clients should have been defaulting this field to true.. but
640            // they were not.
641            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
642                info.setClientMaster(true);
643            }
644            TransportConnectionState state;
645            // Make sure 2 concurrent connections by the same ID only generate 1
646            // TransportConnectionState object.
647            synchronized (brokerConnectionStates) {
648                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
649                if (state == null) {
650                    state = new TransportConnectionState(info, this);
651                    brokerConnectionStates.put(info.getConnectionId(), state);
652                }
653                state.incrementReference();
654            }
655            // If there are 2 concurrent connections for the same connection id,
656            // then last one in wins, we need to sync here
657            // to figure out the winner.
658            synchronized (state.getConnectionMutex()) {
659                if (state.getConnection() != this) {
660                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
661                    state.getConnection().stop();
662                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
663                            + state.getConnection().getRemoteAddress());
664                    state.setConnection(this);
665                    state.reset(info);
666                }
667            }
668            registerConnectionState(info.getConnectionId(), state);
669            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
670            this.faultTolerantConnection=info.isFaultTolerant();
671            // Setup the context.
672            String clientId = info.getClientId();
673            context = new ConnectionContext();
674            context.setBroker(broker);
675            context.setClientId(clientId);
676            context.setClientMaster(info.isClientMaster());
677            context.setConnection(this);
678            context.setConnectionId(info.getConnectionId());
679            context.setConnector(connector);
680            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
681            context.setNetworkConnection(networkConnection);
682            context.setFaultTolerant(faultTolerantConnection);
683            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
684            context.setUserName(info.getUserName());
685            context.setWireFormatInfo(wireFormatInfo);
686            context.setReconnect(info.isFailoverReconnect());
687            this.manageable = info.isManageable();
688            state.setContext(context);
689            state.setConnection(this);
690           
691            try {
692                broker.addConnection(context, info);
693            } catch (Exception e) {
694                synchronized (brokerConnectionStates) {
695                    brokerConnectionStates.remove(info.getConnectionId());
696                }
697                unregisterConnectionState(info.getConnectionId());
698                LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " +  e.toString());
699                if (LOG.isDebugEnabled()) {
700                    LOG.debug("Exception detail:", e);
701                }
702                throw e;
703            }
704            if (info.isManageable()) {
705                // send ConnectionCommand
706                ConnectionControl command = this.connector.getConnectionControl();
707                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
708                dispatchAsync(command);
709            }
710            return null;
711        }
712    
713        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
714                throws InterruptedException {
715            LOG.debug("remove connection id: " + id);
716            TransportConnectionState cs = lookupConnectionState(id);
717            if (cs != null) {
718                // Don't allow things to be added to the connection state while we
719                // are
720                // shutting down.
721                cs.shutdown();
722                // Cascade the connection stop to the sessions.
723                for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
724                    SessionId sessionId = (SessionId) iter.next();
725                    try {
726                        processRemoveSession(sessionId, lastDeliveredSequenceId);
727                    } catch (Throwable e) {
728                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
729                    }
730                }
731                // Cascade the connection stop to temp destinations.
732                for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
733                    DestinationInfo di = (DestinationInfo) iter.next();
734                    try {
735                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
736                    } catch (Throwable e) {
737                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
738                    }
739                    iter.remove();
740                }
741                try {
742                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
743                } catch (Throwable e) {
744                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
745                    if (LOG.isDebugEnabled()) {
746                        SERVICELOG.debug("Exception detail:", e);
747                    }
748                }
749                TransportConnectionState state = unregisterConnectionState(id);
750                if (state != null) {
751                    synchronized (brokerConnectionStates) {
752                        // If we are the last reference, we should remove the state
753                        // from the broker.
754                        if (state.decrementReference() == 0) {
755                            brokerConnectionStates.remove(id);
756                        }
757                    }
758                }
759            }
760            return null;
761        }
762    
763        public Response processProducerAck(ProducerAck ack) throws Exception {
764            // A broker should not get ProducerAck messages.
765            return null;
766        }
767    
768        public Connector getConnector() {
769            return connector;
770        }
771    
772        public void dispatchSync(Command message) {
773            // getStatistics().getEnqueues().increment();
774            try {
775                processDispatch(message);
776            } catch (IOException e) {
777                serviceExceptionAsync(e);
778            }
779        }
780    
781        public void dispatchAsync(Command message) {
782            if (!stopping.get()) {
783                // getStatistics().getEnqueues().increment();
784                if (taskRunner == null) {
785                    dispatchSync(message);
786                } else {
787                    synchronized (dispatchQueue) {
788                        dispatchQueue.add(message);
789                    }
790                    try {
791                        taskRunner.wakeup();
792                    } catch (InterruptedException e) {
793                        Thread.currentThread().interrupt();
794                    }
795                }
796            } else {
797                if (message.isMessageDispatch()) {
798                    MessageDispatch md = (MessageDispatch) message;
799                    Runnable sub = md.getTransmitCallback();
800                    broker.postProcessDispatch(md);
801                    if (sub != null) {
802                        sub.run();
803                    }
804                }
805            }
806        }
807    
808        protected void processDispatch(Command command) throws IOException {
809            final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
810            try {
811                if (!stopping.get()) {
812                    if (messageDispatch != null) {
813                        broker.preProcessDispatch(messageDispatch);
814                    }
815                    dispatch(command);
816                }
817            } finally {
818                if (messageDispatch != null) {
819                    Runnable sub = messageDispatch.getTransmitCallback();
820                    broker.postProcessDispatch(messageDispatch);
821                    if (sub != null) {
822                        sub.run();
823                    }
824                }
825                // getStatistics().getDequeues().increment();
826            }
827        }
828    
829        public boolean iterate() {
830            try {
831                if (stopping.get()) {
832                    if (dispatchStopped.compareAndSet(false, true)) {
833                        if (transportException.get() == null) {
834                            try {
835                                dispatch(new ShutdownInfo());
836                            } catch (Throwable ignore) {
837                            }
838                        }
839                        dispatchStoppedLatch.countDown();
840                    }
841                    return false;
842                }
843                if (!dispatchStopped.get()) {
844                    Command command = null;
845                    synchronized (dispatchQueue) {
846                        if (dispatchQueue.isEmpty()) {
847                            return false;
848                        }
849                        command = dispatchQueue.remove(0);
850                    }
851                    processDispatch(command);
852                    return true;
853                }
854                return false;
855            } catch (IOException e) {
856                if (dispatchStopped.compareAndSet(false, true)) {
857                    dispatchStoppedLatch.countDown();
858                }
859                serviceExceptionAsync(e);
860                return false;
861            }
862        }
863    
864        /**
865         * Returns the statistics for this connection
866         */
867        public ConnectionStatistics getStatistics() {
868            return statistics;
869        }
870    
871        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
872            return messageAuthorizationPolicy;
873        }
874    
875        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
876            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
877        }
878    
879        public boolean isManageable() {
880            return manageable;
881        }
882    
883        public void start() throws Exception {
884            starting = true;
885            try {
886                synchronized (this) {
887                    if (taskRunnerFactory != null) {
888                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
889                                + getRemoteAddress());
890                    } else {
891                        taskRunner = null;
892                    }
893                    transport.start();
894                    active = true;
895                    BrokerInfo info = connector.getBrokerInfo().copy();
896                    if (connector.isUpdateClusterClients()) {
897                        info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
898                    } else {
899                        info.setPeerBrokerInfos(null);
900                    }
901                    dispatchAsync(info);
902                    
903                    connector.onStarted(this);
904                }
905            } catch (Exception e) {
906                // Force clean up on an error starting up.
907                stop();
908                throw e;
909            } finally {
910                // stop() can be called from within the above block,
911                // but we want to be sure start() completes before
912                // stop() runs, so queue the stop until right now:
913                starting = false;
914                if (pendingStop) {
915                    LOG.debug("Calling the delayed stop()");
916                    stop();
917                }
918            }
919        }
920    
921        public void stop() throws Exception {
922            synchronized (this) {
923                pendingStop = true;
924                if (starting) {
925                    LOG.debug("stop() called in the middle of start(). Delaying...");
926                    return;
927                }
928            }
929            stopAsync();
930            while (!stopped.await(5, TimeUnit.SECONDS)) {
931                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
932            }
933        }
934    
935        public void stopAsync() {
936            // If we're in the middle of starting
937            // then go no further... for now.
938            if (stopping.compareAndSet(false, true)) {
939                // Let all the connection contexts know we are shutting down
940                // so that in progress operations can notice and unblock.
941                List<TransportConnectionState> connectionStates = listConnectionStates();
942                for (TransportConnectionState cs : connectionStates) {
943                    cs.getContext().getStopping().set(true);
944                }
945                try {
946                    final Map context = MDCHelper.getCopyOfContextMap();
947                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
948                        public void run() {
949                            serviceLock.writeLock().lock();
950                            try {
951                                MDCHelper.setContextMap(context);
952                                doStop();
953                            } catch (Throwable e) {
954                                LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
955                                        + "': ", e);
956                            } finally {
957                                stopped.countDown();
958                                serviceLock.writeLock().unlock();
959                            }
960                        }
961                    }, "StopAsync:" + transport.getRemoteAddress());
962                } catch (Throwable t) {
963                    LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
964                    stopped.countDown();
965                }
966            }
967        }
968    
969        @Override
970        public String toString() {
971            return "Transport Connection to: " + transport.getRemoteAddress();
972        }
973    
974        protected void doStop() throws Exception, InterruptedException {
975            LOG.debug("Stopping connection: " + transport.getRemoteAddress());
976            connector.onStopped(this);
977            try {
978                synchronized (this) {
979                    if (masterBroker != null) {
980                        masterBroker.stop();
981                    }
982                    if (duplexBridge != null) {
983                        duplexBridge.stop();
984                    }
985                }
986            } catch (Exception ignore) {
987                LOG.trace("Exception caught stopping", ignore);
988            }
989            try {
990                transport.stop();
991                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
992            } catch (Exception e) {
993                LOG.debug("Could not stop transport: " + e, e);
994            }
995            if (taskRunner != null) {
996                taskRunner.shutdown(1);
997            }
998            active = false;
999            // Run the MessageDispatch callbacks so that message references get
1000            // cleaned up.
1001            synchronized (dispatchQueue) {
1002                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
1003                    Command command = iter.next();
1004                    if (command.isMessageDispatch()) {
1005                        MessageDispatch md = (MessageDispatch) command;
1006                        Runnable sub = md.getTransmitCallback();
1007                        broker.postProcessDispatch(md);
1008                        if (sub != null) {
1009                            sub.run();
1010                        }
1011                    }
1012                }
1013                dispatchQueue.clear();
1014            }
1015            //
1016            // Remove all logical connection associated with this connection
1017            // from the broker.
1018            if (!broker.isStopped()) {
1019                List<TransportConnectionState> connectionStates = listConnectionStates();
1020                connectionStates = listConnectionStates();
1021                for (TransportConnectionState cs : connectionStates) {
1022                    cs.getContext().getStopping().set(true);
1023                    try {
1024                        LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1025                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1026                    } catch (Throwable ignore) {
1027                        ignore.printStackTrace();
1028                    }
1029                }
1030            }
1031            LOG.debug("Connection Stopped: " + getRemoteAddress());
1032        }
1033    
1034        /**
1035         * @return Returns the blockedCandidate.
1036         */
1037        public boolean isBlockedCandidate() {
1038            return blockedCandidate;
1039        }
1040    
1041        /**
1042         * @param blockedCandidate
1043         *            The blockedCandidate to set.
1044         */
1045        public void setBlockedCandidate(boolean blockedCandidate) {
1046            this.blockedCandidate = blockedCandidate;
1047        }
1048    
1049        /**
1050         * @return Returns the markedCandidate.
1051         */
1052        public boolean isMarkedCandidate() {
1053            return markedCandidate;
1054        }
1055    
1056        /**
1057         * @param markedCandidate
1058         *            The markedCandidate to set.
1059         */
1060        public void setMarkedCandidate(boolean markedCandidate) {
1061            this.markedCandidate = markedCandidate;
1062            if (!markedCandidate) {
1063                timeStamp = 0;
1064                blockedCandidate = false;
1065            }
1066        }
1067    
1068        /**
1069         * @param slow
1070         *            The slow to set.
1071         */
1072        public void setSlow(boolean slow) {
1073            this.slow = slow;
1074        }
1075    
1076        /**
1077         * @return true if the Connection is slow
1078         */
1079        public boolean isSlow() {
1080            return slow;
1081        }
1082    
1083        /**
1084         * @return true if the Connection is potentially blocked
1085         */
1086        public boolean isMarkedBlockedCandidate() {
1087            return markedCandidate;
1088        }
1089    
1090        /**
1091         * Mark the Connection, so we can deem if it's collectable on the next sweep
1092         */
1093        public void doMark() {
1094            if (timeStamp == 0) {
1095                timeStamp = System.currentTimeMillis();
1096            }
1097        }
1098    
1099        /**
1100         * @return if after being marked, the Connection is still writing
1101         */
1102        public boolean isBlocked() {
1103            return blocked;
1104        }
1105    
1106        /**
1107         * @return true if the Connection is connected
1108         */
1109        public boolean isConnected() {
1110            return connected;
1111        }
1112    
1113        /**
1114         * @param blocked
1115         *            The blocked to set.
1116         */
1117        public void setBlocked(boolean blocked) {
1118            this.blocked = blocked;
1119        }
1120    
1121        /**
1122         * @param connected
1123         *            The connected to set.
1124         */
1125        public void setConnected(boolean connected) {
1126            this.connected = connected;
1127        }
1128    
1129        /**
1130         * @return true if the Connection is active
1131         */
1132        public boolean isActive() {
1133            return active;
1134        }
1135    
1136        /**
1137         * @param active
1138         *            The active to set.
1139         */
1140        public void setActive(boolean active) {
1141            this.active = active;
1142        }
1143    
1144        /**
1145         * @return true if the Connection is starting
1146         */
1147        public synchronized boolean isStarting() {
1148            return starting;
1149        }
1150    
1151        public synchronized boolean isNetworkConnection() {
1152            return networkConnection;
1153        }
1154        
1155        public boolean isFaultTolerantConnection() {
1156           return this.faultTolerantConnection;
1157        }
1158    
1159        protected synchronized void setStarting(boolean starting) {
1160            this.starting = starting;
1161        }
1162    
1163        /**
1164         * @return true if the Connection needs to stop
1165         */
1166        public synchronized boolean isPendingStop() {
1167            return pendingStop;
1168        }
1169    
1170        protected synchronized void setPendingStop(boolean pendingStop) {
1171            this.pendingStop = pendingStop;
1172        }
1173    
1174        public Response processBrokerInfo(BrokerInfo info) {
1175            if (info.isSlaveBroker()) {
1176                BrokerService bService = connector.getBrokerService();
1177                // Do we only support passive slaves - or does the slave want to be
1178                // passive ?
1179                boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1180                if (passive == false) {
1181                    
1182                    // stream messages from this broker (the master) to
1183                    // the slave
1184                    MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1185                    masterBroker = new MasterBroker(parent, transport);
1186                    masterBroker.startProcessing();
1187                }
1188                LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
1189                bService.slaveConnectionEstablished();
1190            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1191                // so this TransportConnection is the rear end of a network bridge
1192                // We have been requested to create a two way pipe ...
1193                try {
1194                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1195                    Map<String, String> props = createMap(properties);
1196                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1197                    IntrospectionSupport.setProperties(config, props, "");
1198                    config.setBrokerName(broker.getBrokerName());
1199    
1200                    // check for existing duplex connection hanging about
1201    
1202                    // We first look if existing network connection already exists for the same broker Id and network connector name
1203                    // It's possible in case of brief network fault to have this transport connector side of the connection always active
1204                    // and the duplex network connector side wanting to open a new one
1205                    // In this case, the old connection must be broken
1206                    String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 
1207                    CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1208                    synchronized (connections) {
1209                        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
1210                            TransportConnection c = iter.next();
1211                            if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1212                                LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1213                                c.stopAsync();
1214                                // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1215                                c.getStopped().await(1, TimeUnit.SECONDS);
1216                            }
1217                        }
1218                        setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1219                    }
1220                    URI uri = broker.getVmConnectorURI();
1221                    HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1222                    map.put("network", "true");
1223                    map.put("async", "false");
1224                    uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1225                    Transport localTransport = TransportFactory.connect(uri);
1226                    Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1227                    String duplexName = localTransport.toString();
1228                    if (duplexName.contains("#")) {
1229                        duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1230                    }
1231                    MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1232                    listener.setCreatedByDuplex(true);
1233                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1234                    duplexBridge.setBrokerService(broker.getBrokerService());
1235                    // now turn duplex off this side
1236                    info.setDuplexConnection(false);
1237                    duplexBridge.setCreatedByDuplex(true);
1238                    duplexBridge.duplexStart(this, brokerInfo, info);
1239                    LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1240                    return null;
1241                } catch (TransportDisposedIOException e) {
1242                    LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1243                    return null;
1244                } catch (Exception e) {
1245                    LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
1246                    return null;
1247                }
1248            }
1249            // We only expect to get one broker info command per connection
1250            if (this.brokerInfo != null) {
1251                LOG.warn("Unexpected extra broker info command received: " + info);
1252            }
1253            this.brokerInfo = info;
1254            networkConnection = true;
1255            List<TransportConnectionState> connectionStates = listConnectionStates();
1256            for (TransportConnectionState cs : connectionStates) {
1257                cs.getContext().setNetworkConnection(true);
1258            }
1259            return null;
1260        }
1261    
1262        @SuppressWarnings("unchecked")
1263        private HashMap<String, String> createMap(Properties properties) {
1264            return new HashMap(properties);
1265        }
1266    
1267        protected void dispatch(Command command) throws IOException {
1268            try {
1269                setMarkedCandidate(true);
1270                transport.oneway(command);
1271            } finally {
1272                setMarkedCandidate(false);
1273            }
1274        }
1275    
1276        public String getRemoteAddress() {
1277            return transport.getRemoteAddress();
1278        }
1279    
1280        public String getConnectionId() {
1281            List<TransportConnectionState> connectionStates = listConnectionStates();
1282            for (TransportConnectionState cs : connectionStates) {
1283                if (cs.getInfo().getClientId() != null) {
1284                    return cs.getInfo().getClientId();
1285                }
1286                return cs.getInfo().getConnectionId().toString();
1287            }
1288            return null;
1289        }
1290            
1291        public void updateClient(ConnectionControl control) {
1292            if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1293                    && this.wireFormatInfo.getVersion() >= 6) {
1294                dispatchAsync(control);
1295            }
1296        }
1297    
1298        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1299            ProducerBrokerExchange result = producerExchanges.get(id);
1300            if (result == null) {
1301                synchronized (producerExchanges) {
1302                    result = new ProducerBrokerExchange();
1303                    TransportConnectionState state = lookupConnectionState(id);              
1304                    context = state.getContext();
1305                    if (context.isReconnect()) {
1306                        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1307                    }
1308                    result.setConnectionContext(context);
1309                    SessionState ss = state.getSessionState(id.getParentId());
1310                    if (ss != null) {
1311                        result.setProducerState(ss.getProducerState(id));
1312                        ProducerState producerState = ss.getProducerState(id);
1313                        if (producerState != null && producerState.getInfo() != null) {
1314                            ProducerInfo info = producerState.getInfo();
1315                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1316                        }
1317                    }
1318                    producerExchanges.put(id, result);
1319                }
1320            } else {
1321                context = result.getConnectionContext();
1322            }
1323            return result;
1324        }
1325    
1326        private void removeProducerBrokerExchange(ProducerId id) {
1327            synchronized (producerExchanges) {
1328                producerExchanges.remove(id);
1329            }
1330        }
1331    
1332        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1333            ConsumerBrokerExchange result = consumerExchanges.get(id);
1334            if (result == null) {
1335                synchronized (consumerExchanges) {
1336                    result = new ConsumerBrokerExchange();
1337                    TransportConnectionState state = lookupConnectionState(id);
1338                    context = state.getContext();
1339                    result.setConnectionContext(context);
1340                    SessionState ss = state.getSessionState(id.getParentId());
1341                    if (ss != null) {
1342                        ConsumerState cs = ss.getConsumerState(id);
1343                        if (cs != null) {
1344                            ConsumerInfo info = cs.getInfo();
1345                            if (info != null) {
1346                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1347                                    result.setWildcard(true);
1348                                }
1349                            }
1350                        }
1351                    }
1352                    consumerExchanges.put(id, result);
1353                }
1354            }
1355            return result;
1356        }
1357    
1358        private void removeConsumerBrokerExchange(ConsumerId id) {
1359            synchronized (consumerExchanges) {
1360                consumerExchanges.remove(id);
1361            }
1362        }
1363    
1364        public int getProtocolVersion() {
1365            return protocolVersion.get();
1366        }
1367    
1368        public Response processControlCommand(ControlCommand command) throws Exception {
1369            String control = command.getCommand();
1370            if (control != null && control.equals("shutdown")) {
1371                System.exit(0);
1372            }
1373            return null;
1374        }
1375    
1376        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1377            return null;
1378        }
1379    
1380        public Response processConnectionControl(ConnectionControl control) throws Exception {
1381            if (control != null) {
1382                faultTolerantConnection = control.isFaultTolerant();
1383            }
1384            return null;
1385        }
1386    
1387        public Response processConnectionError(ConnectionError error) throws Exception {
1388            return null;
1389        }
1390    
1391        public Response processConsumerControl(ConsumerControl control) throws Exception {
1392            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1393            broker.processConsumerControl(consumerExchange, control);
1394            return null;
1395        }
1396    
1397        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1398                TransportConnectionState state) {
1399            TransportConnectionState cs = null;
1400            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1401                // swap implementations
1402                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1403                newRegister.intialize(connectionStateRegister);
1404                connectionStateRegister = newRegister;
1405            }
1406            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1407            return cs;
1408        }
1409    
1410        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1411            return connectionStateRegister.unregisterConnectionState(connectionId);
1412        }
1413    
1414        protected synchronized List<TransportConnectionState> listConnectionStates() {
1415            return connectionStateRegister.listConnectionStates();
1416        }
1417    
1418        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1419            return connectionStateRegister.lookupConnectionState(connectionId);
1420        }
1421    
1422        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1423            return connectionStateRegister.lookupConnectionState(id);
1424        }
1425    
1426        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1427            return connectionStateRegister.lookupConnectionState(id);
1428        }
1429    
1430        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1431            return connectionStateRegister.lookupConnectionState(id);
1432        }
1433    
1434        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1435            return connectionStateRegister.lookupConnectionState(connectionId);
1436        }
1437    
1438        protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1439            this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1440        }
1441    
1442        protected synchronized String getDuplexNetworkConnectorId() {
1443            return this.duplexNetworkConnectorId;
1444        }
1445        
1446        protected CountDownLatch getStopped() {
1447            return stopped;
1448        }
1449    }