001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.ThreadPoolExecutor;
030    import javax.jms.InvalidClientIDException;
031    import javax.jms.JMSException;
032    import org.apache.activemq.advisory.AdvisorySupport;
033    import org.apache.activemq.broker.Broker;
034    import org.apache.activemq.broker.BrokerService;
035    import org.apache.activemq.broker.Connection;
036    import org.apache.activemq.broker.ConnectionContext;
037    import org.apache.activemq.broker.ConsumerBrokerExchange;
038    import org.apache.activemq.broker.EmptyBroker;
039    import org.apache.activemq.broker.ProducerBrokerExchange;
040    import org.apache.activemq.broker.TransportConnector;
041    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
042    import org.apache.activemq.broker.region.policy.PolicyMap;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.BrokerId;
045    import org.apache.activemq.command.BrokerInfo;
046    import org.apache.activemq.command.ConnectionId;
047    import org.apache.activemq.command.ConnectionInfo;
048    import org.apache.activemq.command.ConsumerControl;
049    import org.apache.activemq.command.ConsumerInfo;
050    import org.apache.activemq.command.DestinationInfo;
051    import org.apache.activemq.command.Message;
052    import org.apache.activemq.command.MessageAck;
053    import org.apache.activemq.command.MessageDispatch;
054    import org.apache.activemq.command.MessageDispatchNotification;
055    import org.apache.activemq.command.MessagePull;
056    import org.apache.activemq.command.ProducerInfo;
057    import org.apache.activemq.command.RemoveSubscriptionInfo;
058    import org.apache.activemq.command.Response;
059    import org.apache.activemq.command.TransactionId;
060    import org.apache.activemq.state.ConnectionState;
061    import org.apache.activemq.store.kahadb.plist.PListStore;
062    import org.apache.activemq.thread.Scheduler;
063    import org.apache.activemq.thread.TaskRunnerFactory;
064    import org.apache.activemq.usage.SystemUsage;
065    import org.apache.activemq.util.BrokerSupport;
066    import org.apache.activemq.util.IdGenerator;
067    import org.apache.activemq.util.InetAddressUtil;
068    import org.apache.activemq.util.LongSequenceGenerator;
069    import org.apache.activemq.util.ServiceStopper;
070    import org.slf4j.Logger;
071    import org.slf4j.LoggerFactory;
072    
073    /**
074     * Routes Broker operations to the correct messaging regions for processing.
075     * 
076     * 
077     */
078    public class RegionBroker extends EmptyBroker {
079        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
080        private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
081        private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
082    
083        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
084        protected DestinationFactory destinationFactory;
085        protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
086    
087        private final Region queueRegion;
088        private final Region topicRegion;
089        private final Region tempQueueRegion;
090        private final Region tempTopicRegion;
091        protected final BrokerService brokerService;
092        private boolean started;
093        private boolean keepDurableSubsActive;
094    
095        private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
096        private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
097        private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
098    
099        private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
100        private BrokerId brokerId;
101        private String brokerName;
102        private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
103        private final DestinationInterceptor destinationInterceptor;
104        private ConnectionContext adminConnectionContext;
105        private final Scheduler scheduler;
106        private final ThreadPoolExecutor executor;
107        
108        private final Runnable purgeInactiveDestinationsTask = new Runnable() {
109            public void run() {
110                purgeInactiveDestinations();
111            }
112        };
113    
114        public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
115                            DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
116            this.brokerService = brokerService;
117            this.executor=executor;
118            this.scheduler = scheduler;
119            if (destinationFactory == null) {
120                throw new IllegalArgumentException("null destinationFactory");
121            }
122            this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
123            this.destinationFactory = destinationFactory;
124            queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
125            topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
126            this.destinationInterceptor = destinationInterceptor;
127            tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
128            tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
129        }
130    
131        @Override
132        public Map<ActiveMQDestination, Destination> getDestinationMap() {
133            Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
134            answer.putAll(getTopicRegion().getDestinationMap());
135            return answer;
136        }
137    
138        @Override
139        public Set <Destination> getDestinations(ActiveMQDestination destination) {
140            switch (destination.getDestinationType()) {
141            case ActiveMQDestination.QUEUE_TYPE:
142                return queueRegion.getDestinations(destination);
143            case ActiveMQDestination.TOPIC_TYPE:
144                return topicRegion.getDestinations(destination);
145            case ActiveMQDestination.TEMP_QUEUE_TYPE:
146                return tempQueueRegion.getDestinations(destination);
147            case ActiveMQDestination.TEMP_TOPIC_TYPE:
148                return tempTopicRegion.getDestinations(destination);
149            default:
150                return Collections.emptySet();
151            }
152        }
153    
154        @Override
155        public Broker getAdaptor(Class type) {
156            if (type.isInstance(this)) {
157                return this;
158            }
159            return null;
160        }
161    
162        public Region getQueueRegion() {
163            return queueRegion;
164        }
165    
166        public Region getTempQueueRegion() {
167            return tempQueueRegion;
168        }
169    
170        public Region getTempTopicRegion() {
171            return tempTopicRegion;
172        }
173    
174        public Region getTopicRegion() {
175            return topicRegion;
176        }
177    
178        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
179            return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
180        }
181    
182        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
183            return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
184        }
185    
186        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
187            return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
188        }
189    
190        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
191            return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
192        }
193    
194        @Override
195        public void start() throws Exception {
196            ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
197            started = true;
198            queueRegion.start();
199            topicRegion.start();
200            tempQueueRegion.start();
201            tempTopicRegion.start();
202            int period = this.brokerService.getSchedulePeriodForDestinationPurge();
203            if (period > 0) {
204                this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
205            }
206        }
207    
208        @Override
209        public void stop() throws Exception {
210            started = false;
211            this.scheduler.cancel(purgeInactiveDestinationsTask);
212            ServiceStopper ss = new ServiceStopper();
213            doStop(ss);
214            ss.throwFirstException();
215            // clear the state
216            clientIdSet.clear();
217            connections.clear();
218            destinations.clear();
219            brokerInfos.clear();
220        }
221    
222        public PolicyMap getDestinationPolicy() {
223            return brokerService != null ? brokerService.getDestinationPolicy() : null;
224        }
225    
226        @Override
227        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
228            String clientId = info.getClientId();
229            if (clientId == null) {
230                throw new InvalidClientIDException("No clientID specified for connection request");
231            }
232            synchronized (clientIdSet) {
233                ConnectionContext oldContext = clientIdSet.get(clientId);
234                if (oldContext != null) {
235                    if (context.isFaultTolerant() || context.isNetworkConnection()){
236                            //remove the old connection
237                            try{
238                                    removeConnection(oldContext, info, new Exception("remove stale client"));
239                            }catch(Exception e){
240                                    LOG.warn("Failed to remove stale connection ",e);
241                            }
242                    }else{
243                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
244                                                       + oldContext.getConnection().getRemoteAddress());
245                    }
246                } else {
247                    clientIdSet.put(clientId, context);
248                }
249            }
250    
251            connections.add(context.getConnection());
252        }
253    
254        @Override
255        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
256            String clientId = info.getClientId();
257            if (clientId == null) {
258                throw new InvalidClientIDException("No clientID specified for connection disconnect request");
259            }
260            synchronized (clientIdSet) {
261                ConnectionContext oldValue = clientIdSet.get(clientId);
262                // we may be removing the duplicate connection, not the first
263                // connection to be created
264                // so lets check that their connection IDs are the same
265                if (oldValue == context) {
266                    if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
267                        clientIdSet.remove(clientId);
268                    }
269                }
270            }
271            connections.remove(context.getConnection());
272        }
273    
274        protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
275            return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
276        }
277    
278        @Override
279        public Connection[] getClients() throws Exception {
280            ArrayList<Connection> l = new ArrayList<Connection>(connections);
281            Connection rc[] = new Connection[l.size()];
282            l.toArray(rc);
283            return rc;
284        }
285    
286        @Override
287        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
288    
289            Destination answer;
290    
291            answer = destinations.get(destination);
292            if (answer != null) {
293                return answer;
294            }
295    
296            switch (destination.getDestinationType()) {
297            case ActiveMQDestination.QUEUE_TYPE:
298                answer = queueRegion.addDestination(context, destination,true);
299                break;
300            case ActiveMQDestination.TOPIC_TYPE:
301                answer = topicRegion.addDestination(context, destination,true);
302                break;
303            case ActiveMQDestination.TEMP_QUEUE_TYPE:
304                answer = tempQueueRegion.addDestination(context, destination,create);
305                break;
306            case ActiveMQDestination.TEMP_TOPIC_TYPE:
307                answer = tempTopicRegion.addDestination(context, destination,create);
308                break;
309            default:
310                throw createUnknownDestinationTypeException(destination);
311            }
312    
313            destinations.put(destination, answer);
314            return answer;
315    
316        }
317    
318        @Override
319        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
320    
321            if (destinations.containsKey(destination)) {
322                switch (destination.getDestinationType()) {
323                case ActiveMQDestination.QUEUE_TYPE:
324                    queueRegion.removeDestination(context, destination, timeout);
325                    removeAdvisoryTopics("Queue.", context, destination, timeout);
326                    break;
327                case ActiveMQDestination.TOPIC_TYPE:
328                    topicRegion.removeDestination(context, destination, timeout);
329                    removeAdvisoryTopics("Topic.", context, destination, timeout);
330                    break;
331                case ActiveMQDestination.TEMP_QUEUE_TYPE:
332                    tempQueueRegion.removeDestination(context, destination, timeout);
333                    break;
334                case ActiveMQDestination.TEMP_TOPIC_TYPE:
335                    tempTopicRegion.removeDestination(context, destination, timeout);
336                    break;
337                default:
338                    throw createUnknownDestinationTypeException(destination);
339                }
340                destinations.remove(destination);
341    
342            }
343    
344        }
345    
346        public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
347            if (this.brokerService.isAdvisorySupport()) {
348                String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
349                String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
350    
351                ActiveMQDestination dests[] = getDestinations();
352                for (ActiveMQDestination dest: dests) {
353                    String name = dest.getPhysicalName();
354                    if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) {
355                        try {
356                            removeDestination(context, dest, timeout);
357                        } catch (JMSException ignore) {
358                            // at least ignore the Unknown Destination Type JMSException
359                        }
360                    }
361                }
362            }
363        }
364    
365        @Override
366        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
367            addDestination(context, info.getDestination(),true);
368    
369        }
370    
371        @Override
372        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
373            removeDestination(context, info.getDestination(), info.getTimeout());
374    
375        }
376    
377        @Override
378        public ActiveMQDestination[] getDestinations() throws Exception {
379            ArrayList<ActiveMQDestination> l;
380    
381            l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
382    
383            ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
384            l.toArray(rc);
385            return rc;
386        }
387    
388        @Override
389        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
390            ActiveMQDestination destination = info.getDestination();
391            synchronized (purgeInactiveDestinationsTask) {
392                if (destination != null) {
393    
394                    // This seems to cause the destination to be added but without
395                    // advisories firing...
396                    context.getBroker().addDestination(context, destination, false);
397                    switch (destination.getDestinationType()) {
398                    case ActiveMQDestination.QUEUE_TYPE:
399                        queueRegion.addProducer(context, info);
400                        break;
401                    case ActiveMQDestination.TOPIC_TYPE:
402                        topicRegion.addProducer(context, info);
403                        break;
404                    case ActiveMQDestination.TEMP_QUEUE_TYPE:
405                        tempQueueRegion.addProducer(context, info);
406                        break;
407                    case ActiveMQDestination.TEMP_TOPIC_TYPE:
408                        tempTopicRegion.addProducer(context, info);
409                        break;
410                    }
411                }
412            }
413        }
414    
415        @Override
416        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
417            ActiveMQDestination destination = info.getDestination();
418            synchronized (purgeInactiveDestinationsTask) {
419                if (destination != null) {
420                    switch (destination.getDestinationType()) {
421                    case ActiveMQDestination.QUEUE_TYPE:
422                        queueRegion.removeProducer(context, info);
423                        break;
424                    case ActiveMQDestination.TOPIC_TYPE:
425                        topicRegion.removeProducer(context, info);
426                        break;
427                    case ActiveMQDestination.TEMP_QUEUE_TYPE:
428                        tempQueueRegion.removeProducer(context, info);
429                        break;
430                    case ActiveMQDestination.TEMP_TOPIC_TYPE:
431                        tempTopicRegion.removeProducer(context, info);
432                        break;
433                    }
434                }
435            }
436        }
437    
438        @Override
439        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
440            ActiveMQDestination destination = info.getDestination();
441            if (destinationInterceptor != null) {
442                destinationInterceptor.create(this, context, destination);
443            }
444            synchronized (purgeInactiveDestinationsTask) {
445                switch (destination.getDestinationType()) {
446                case ActiveMQDestination.QUEUE_TYPE:
447                    return queueRegion.addConsumer(context, info);
448    
449                case ActiveMQDestination.TOPIC_TYPE:
450                    return topicRegion.addConsumer(context, info);
451    
452                case ActiveMQDestination.TEMP_QUEUE_TYPE:
453                    return tempQueueRegion.addConsumer(context, info);
454    
455                case ActiveMQDestination.TEMP_TOPIC_TYPE:
456                    return tempTopicRegion.addConsumer(context, info);
457    
458                default:
459                    throw createUnknownDestinationTypeException(destination);
460                }
461            }
462        }
463    
464        @Override
465        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
466            ActiveMQDestination destination = info.getDestination();
467            synchronized (purgeInactiveDestinationsTask) {
468                switch (destination.getDestinationType()) {
469    
470                case ActiveMQDestination.QUEUE_TYPE:
471                    queueRegion.removeConsumer(context, info);
472                    break;
473                case ActiveMQDestination.TOPIC_TYPE:
474                    topicRegion.removeConsumer(context, info);
475                    break;
476                case ActiveMQDestination.TEMP_QUEUE_TYPE:
477                    tempQueueRegion.removeConsumer(context, info);
478                    break;
479                case ActiveMQDestination.TEMP_TOPIC_TYPE:
480                    tempTopicRegion.removeConsumer(context, info);
481                    break;
482                default:
483                    throw createUnknownDestinationTypeException(destination);
484                }
485            }
486        }
487    
488        @Override
489        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
490            synchronized (purgeInactiveDestinationsTask) {
491                topicRegion.removeSubscription(context, info);
492            }
493        }
494    
495        @Override
496        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
497            message.setBrokerInTime(System.currentTimeMillis());
498            if (producerExchange.isMutable() || producerExchange.getRegion() == null
499                    || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
500                ActiveMQDestination destination = message.getDestination();
501                // ensure the destination is registered with the RegionBroker
502                producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
503                Region region;
504                switch (destination.getDestinationType()) {
505                case ActiveMQDestination.QUEUE_TYPE:
506                    region = queueRegion;
507                    break;
508                case ActiveMQDestination.TOPIC_TYPE:
509                    region = topicRegion;
510                    break;
511                case ActiveMQDestination.TEMP_QUEUE_TYPE:
512                    region = tempQueueRegion;
513                    break;
514                case ActiveMQDestination.TEMP_TOPIC_TYPE:
515                    region = tempTopicRegion;
516                    break;
517                default:
518                    throw createUnknownDestinationTypeException(destination);
519                }
520                producerExchange.setRegion(region);
521                producerExchange.setRegionDestination(null);
522            }
523            producerExchange.getRegion().send(producerExchange, message);
524        }
525    
526        @Override
527        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
528            if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
529                ActiveMQDestination destination = ack.getDestination();
530                Region region;
531                switch (destination.getDestinationType()) {
532                case ActiveMQDestination.QUEUE_TYPE:
533                    region = queueRegion;
534                    break;
535                case ActiveMQDestination.TOPIC_TYPE:
536                    region = topicRegion;
537                    break;
538                case ActiveMQDestination.TEMP_QUEUE_TYPE:
539                    region = tempQueueRegion;
540                    break;
541                case ActiveMQDestination.TEMP_TOPIC_TYPE:
542                    region = tempTopicRegion;
543                    break;
544                default:
545                    throw createUnknownDestinationTypeException(destination);
546                }
547                consumerExchange.setRegion(region);
548            }
549            consumerExchange.getRegion().acknowledge(consumerExchange, ack);
550        }
551    
552        @Override
553        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
554            ActiveMQDestination destination = pull.getDestination();
555            switch (destination.getDestinationType()) {
556            case ActiveMQDestination.QUEUE_TYPE:
557                return queueRegion.messagePull(context, pull);
558    
559            case ActiveMQDestination.TOPIC_TYPE:
560                return topicRegion.messagePull(context, pull);
561    
562            case ActiveMQDestination.TEMP_QUEUE_TYPE:
563                return tempQueueRegion.messagePull(context, pull);
564    
565            case ActiveMQDestination.TEMP_TOPIC_TYPE:
566                return tempTopicRegion.messagePull(context, pull);
567            default:
568                throw createUnknownDestinationTypeException(destination);
569            }
570        }
571    
572        @Override
573        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
574            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575        }
576    
577        @Override
578        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
579            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580        }
581    
582        @Override
583        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
584            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
585        }
586    
587        @Override
588        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
589            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
590        }
591    
592        @Override
593        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
594            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
595        }
596    
597        @Override
598        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
599            throw new IllegalAccessException("Transaction operation not implemented by this broker.");
600        }
601    
602        @Override
603        public void gc() {
604            queueRegion.gc();
605            topicRegion.gc();
606        }
607    
608        @Override
609        public BrokerId getBrokerId() {
610            if (brokerId == null) {
611                brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
612            }
613            return brokerId;
614        }
615    
616        public void setBrokerId(BrokerId brokerId) {
617            this.brokerId = brokerId;
618        }
619    
620        @Override
621        public String getBrokerName() {
622            if (brokerName == null) {
623                try {
624                    brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
625                } catch (Exception e) {
626                    brokerName = "localhost";
627                }
628            }
629            return brokerName;
630        }
631    
632        public void setBrokerName(String brokerName) {
633            this.brokerName = brokerName;
634        }
635    
636        public DestinationStatistics getDestinationStatistics() {
637            return destinationStatistics;
638        }
639    
640        protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
641            return new JMSException("Unknown destination type: " + destination.getDestinationType());
642        }
643    
644        @Override
645        public synchronized void addBroker(Connection connection, BrokerInfo info) {
646            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
647            if (existing == null) {
648                existing = info.copy();
649                existing.setPeerBrokerInfos(null);
650                brokerInfos.put(info.getBrokerId(), existing);
651            }
652            existing.incrementRefCount();
653            LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
654            addBrokerInClusterUpdate();
655        }
656    
657        @Override
658        public synchronized void removeBroker(Connection connection, BrokerInfo info) {
659            if (info != null) {
660                BrokerInfo existing = brokerInfos.get(info.getBrokerId());
661                if (existing != null && existing.decrementRefCount() == 0) {
662                   brokerInfos.remove(info.getBrokerId());
663                }
664                LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
665                removeBrokerInClusterUpdate();
666            }
667        }
668    
669        @Override
670        public synchronized BrokerInfo[] getPeerBrokerInfos() {
671            BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
672            result = brokerInfos.values().toArray(result);
673            return result;
674        }
675    
676        @Override
677        public void preProcessDispatch(MessageDispatch messageDispatch) {
678            Message message = messageDispatch.getMessage();
679            if (message != null) {
680                long endTime = System.currentTimeMillis();
681                message.setBrokerOutTime(endTime);
682                if (getBrokerService().isEnableStatistics()) {
683                    long totalTime = endTime - message.getBrokerInTime();
684                    message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
685                }
686            }
687        }
688    
689        @Override
690        public void postProcessDispatch(MessageDispatch messageDispatch) {
691        }
692    
693        @Override
694        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
695            ActiveMQDestination destination = messageDispatchNotification.getDestination();
696            switch (destination.getDestinationType()) {
697            case ActiveMQDestination.QUEUE_TYPE:
698                queueRegion.processDispatchNotification(messageDispatchNotification);
699                break;
700            case ActiveMQDestination.TOPIC_TYPE:
701                topicRegion.processDispatchNotification(messageDispatchNotification);
702                break;
703            case ActiveMQDestination.TEMP_QUEUE_TYPE:
704                tempQueueRegion.processDispatchNotification(messageDispatchNotification);
705                break;
706            case ActiveMQDestination.TEMP_TOPIC_TYPE:
707                tempTopicRegion.processDispatchNotification(messageDispatchNotification);
708                break;
709            default:
710                throw createUnknownDestinationTypeException(destination);
711            }
712        }
713    
714        public boolean isSlaveBroker() {
715            return brokerService.isSlave();
716        }
717    
718        @Override
719        public boolean isStopped() {
720            return !started;
721        }
722    
723        @Override
724        public Set<ActiveMQDestination> getDurableDestinations() {
725            return destinationFactory.getDestinations();
726        }
727    
728        protected void doStop(ServiceStopper ss) {
729            ss.stop(queueRegion);
730            ss.stop(topicRegion);
731            ss.stop(tempQueueRegion);
732            ss.stop(tempTopicRegion);
733        }
734    
735        public boolean isKeepDurableSubsActive() {
736            return keepDurableSubsActive;
737        }
738    
739        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
740            this.keepDurableSubsActive = keepDurableSubsActive;
741        }
742    
743        public DestinationInterceptor getDestinationInterceptor() {
744            return destinationInterceptor;
745        }
746    
747        @Override
748        public ConnectionContext getAdminConnectionContext() {
749            return adminConnectionContext;
750        }
751    
752        @Override
753        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
754            this.adminConnectionContext = adminConnectionContext;
755        }
756    
757        public Map<ConnectionId, ConnectionState> getConnectionStates() {
758            return connectionStates;
759        }
760    
761        @Override
762        public PListStore getTempDataStore() {
763            return brokerService.getTempDataStore();
764        }
765    
766        @Override
767        public URI getVmConnectorURI() {
768            return brokerService.getVmConnectorURI();
769        }
770    
771        @Override
772        public void brokerServiceStarted() {
773        }
774    
775        @Override
776        public BrokerService getBrokerService() {
777            return brokerService;
778        }
779    
780        @Override
781        public boolean isExpired(MessageReference messageReference) {
782            boolean expired = false;
783            if (messageReference.isExpired()) {
784                try {
785                    // prevent duplicate expiry processing
786                    Message message = messageReference.getMessage();
787                    synchronized (message) {
788                        expired = stampAsExpired(message);
789                    }
790                } catch (IOException e) {
791                    LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
792                }
793            }
794            return expired;
795        }
796       
797        private boolean stampAsExpired(Message message) throws IOException {
798            boolean stamped=false;
799            if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
800                long expiration=message.getExpiration();     
801                message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
802                stamped = true;
803            }
804            return stamped;
805        }
806    
807        
808        @Override
809        public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
810            if (LOG.isDebugEnabled()) {
811                LOG.debug("Message expired " + node);
812            }
813            getRoot().sendToDeadLetterQueue(context, node, subscription);
814        }
815        
816        @Override
817        public void sendToDeadLetterQueue(ConnectionContext context,
818                    MessageReference node, Subscription subscription){
819                    try{
820                            if(node!=null){
821                                    Message message=node.getMessage();
822                                    if(message!=null && node.getRegionDestination()!=null){
823                                            DeadLetterStrategy deadLetterStrategy=node
824                                                    .getRegionDestination().getDeadLetterStrategy();
825                                            if(deadLetterStrategy!=null){
826                                                    if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
827                                                        // message may be inflight to other subscriptions so do not modify
828                                                        message = message.copy();
829                                                        stampAsExpired(message);
830                                                        message.setExpiration(0);
831                                                        if(!message.isPersistent()){
832                                                                message.setPersistent(true);
833                                                                message.setProperty("originalDeliveryMode",
834                                                                            "NON_PERSISTENT");
835                                                            }
836                                                            // The original destination and transaction id do
837                                                            // not get filled when the message is first sent,
838                                                            // it is only populated if the message is routed to
839                                                            // another destination like the DLQ
840                                                            ActiveMQDestination deadLetterDestination=deadLetterStrategy
841                                                                    .getDeadLetterQueueFor(message, subscription);
842                                                            if (context.getBroker()==null) {
843                                                                    context.setBroker(getRoot());
844                                                            }
845                                                            BrokerSupport.resendNoCopy(context,message,
846                                                                    deadLetterDestination);
847                                                    }
848                                            } else {
849                                                if (LOG.isDebugEnabled()) {
850                                                    LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
851                                        + message.getMessageId() + ", destination: " + message.getDestination());
852                                                }
853                                            }
854                                    }
855                            }
856                    }catch(Exception e){
857                            LOG.warn("Caught an exception sending to DLQ: "+node,e);
858                    }
859            }
860    
861        @Override
862        public Broker getRoot() {
863            try {
864                return getBrokerService().getBroker();
865            } catch (Exception e) {
866                LOG.error("Trying to get Root Broker " + e);
867                throw new RuntimeException("The broker from the BrokerService should not throw an exception");
868            }
869        }
870        
871        /**
872         * @return the broker sequence id
873         */
874        @Override
875        public long getBrokerSequenceId() {
876            synchronized(sequenceGenerator) {
877                return sequenceGenerator.getNextSequenceId();
878            }
879        }
880        
881        
882        @Override
883        public Scheduler getScheduler() {
884            return this.scheduler;
885        }
886        
887        public ThreadPoolExecutor getExecutor() {
888            return this.executor;
889        }
890        
891        @Override
892        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
893            ActiveMQDestination destination = control.getDestination();
894            switch (destination.getDestinationType()) {
895            case ActiveMQDestination.QUEUE_TYPE:
896                queueRegion.processConsumerControl(consumerExchange, control);
897                break;
898    
899            case ActiveMQDestination.TOPIC_TYPE:
900                topicRegion.processConsumerControl(consumerExchange, control);
901                break;
902                
903            case ActiveMQDestination.TEMP_QUEUE_TYPE:
904                tempQueueRegion.processConsumerControl(consumerExchange, control);
905                break;
906                
907            case ActiveMQDestination.TEMP_TOPIC_TYPE:
908                tempTopicRegion.processConsumerControl(consumerExchange, control);
909                break;
910                
911            default:
912                LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
913            }
914        }
915        
916        protected void addBrokerInClusterUpdate() {
917            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
918            for (TransportConnector connector : connectors) {
919                if (connector.isUpdateClusterClients()) {
920                    connector.updateClientClusterInfo();
921                }
922            }
923        }
924    
925        protected void removeBrokerInClusterUpdate() {
926            List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
927            for (TransportConnector connector : connectors) {
928                if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
929                    connector.updateClientClusterInfo();
930                }
931            }
932        }
933        
934        protected void purgeInactiveDestinations() {
935            synchronized (purgeInactiveDestinationsTask) {
936                List<BaseDestination> list = new ArrayList<BaseDestination>();
937                Map<ActiveMQDestination, Destination> map = getDestinationMap();
938                long timeStamp = System.currentTimeMillis();
939                for (Destination d : map.values()) {
940                    if (d instanceof BaseDestination) {
941                        BaseDestination bd = (BaseDestination) d;
942                        bd.markForGC(timeStamp);
943                        if (bd.canGC()) {
944                            list.add(bd);
945                        }
946                    }
947                }
948    
949                if (list.isEmpty() == false) {
950    
951                    ConnectionContext context = BrokerSupport.getConnectionContext(this);
952                    context.setBroker(this);
953    
954                    for (BaseDestination dest : list) {
955                        dest.getLog().info(
956                                dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
957                                        + " ms - removing ...");
958                        try {
959                            getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
960                        } catch (Exception e) {
961                            LOG.error("Failed to remove inactive destination " + dest, e);
962                        }
963                    }
964                }
965            }
966        }
967    }