001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.Iterator;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.ConcurrentHashMap;
023    import org.apache.activemq.broker.Broker;
024    import org.apache.activemq.broker.BrokerFilter;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.ProducerBrokerExchange;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.Subscription;
030    import org.apache.activemq.broker.region.TopicSubscription;
031    import org.apache.activemq.command.*;
032    import org.apache.activemq.security.SecurityContext;
033    import org.apache.activemq.state.ProducerState;
034    import org.apache.activemq.usage.Usage;
035    import org.apache.activemq.util.IdGenerator;
036    import org.apache.activemq.util.LongSequenceGenerator;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * This broker filter handles tracking the state of the broker for purposes of
042     * publishing advisory messages to advisory consumers.
043     * 
044     * 
045     */
046    public class AdvisoryBroker extends BrokerFilter {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
049        private static final IdGenerator ID_GENERATOR = new IdGenerator();
050    
051        protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
052        protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
053        protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
054        protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
055        protected final ProducerId advisoryProducerId = new ProducerId();
056        
057        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
058        
059        public AdvisoryBroker(Broker next) {
060            super(next);
061            advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
062        }
063    
064        @Override
065        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
066            super.addConnection(context, info);
067    
068            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
069            //do not distribute usernames or passwords in advisory
070            ConnectionInfo copy = info.copy();
071            copy.setUserName("");
072            copy.setPassword("");
073            fireAdvisory(context, topic, copy);
074            connections.put(copy.getConnectionId(), copy);
075        }
076    
077        @Override
078        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
079            Subscription answer = super.addConsumer(context, info);
080            
081            // Don't advise advisory topics.
082            if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
083                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
084                consumers.put(info.getConsumerId(), info);
085                fireConsumerAdvisory(context, info.getDestination(), topic, info);
086            } else {
087                // We need to replay all the previously collected state objects
088                // for this newly added consumer.
089                if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
090                    // Replay the connections.
091                    for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
092                        ConnectionInfo value = iter.next();
093                        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
094                        fireAdvisory(context, topic, value, info.getConsumerId());
095                    }
096                }
097    
098                // We need to replay all the previously collected destination
099                // objects
100                // for this newly added consumer.
101                if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
102                    // Replay the destinations.
103                    for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) {
104                        DestinationInfo value = iter.next();
105                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
106                        fireAdvisory(context, topic, value, info.getConsumerId());
107                    }
108                }
109    
110                // Replay the producers.
111                if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
112                    for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
113                        ProducerInfo value = iter.next();
114                        ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
115                        fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
116                    }
117                }
118    
119                // Replay the consumers.
120                if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
121                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
122                        ConsumerInfo value = iter.next();
123                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
124                        fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
125                    }
126                }
127            }
128            return answer;
129        }
130    
131        @Override
132        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
133            super.addProducer(context, info);
134    
135            // Don't advise advisory topics.
136            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
137                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
138                fireProducerAdvisory(context, info.getDestination(), topic, info);
139                producers.put(info.getProducerId(), info);
140            }
141        }
142    
143        @Override
144        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
145            Destination answer = super.addDestination(context, destination,create);
146            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
147                DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
148                DestinationInfo previous = destinations.putIfAbsent(destination, info);
149                if( previous==null ) {
150                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
151                    fireAdvisory(context, topic, info);
152                }
153            }
154            return answer;
155        }
156    
157        @Override
158        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
159            ActiveMQDestination destination = info.getDestination();
160            next.addDestinationInfo(context, info);
161    
162            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
163                DestinationInfo previous = destinations.putIfAbsent(destination, info);
164                if( previous==null ) {
165                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
166                    fireAdvisory(context, topic, info);
167                }
168            }
169        }
170    
171        @Override
172        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
173            super.removeDestination(context, destination, timeout);
174            DestinationInfo info = destinations.remove(destination);
175            if (info != null) {
176                info.setDestination(destination);
177                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
178                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
179                fireAdvisory(context, topic, info);
180                try {
181                    next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
182                } catch (Exception expectedIfDestinationDidNotExistYet) {                
183                }
184                try {
185                    next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
186                } catch (Exception expectedIfDestinationDidNotExistYet) {
187                }
188            }
189    
190        }
191    
192        @Override
193        public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
194            super.removeDestinationInfo(context, destInfo);   
195            DestinationInfo info = destinations.remove(destInfo.getDestination());
196            if (info != null) {
197                info.setDestination(destInfo.getDestination());
198                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
199                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
200                fireAdvisory(context, topic, info);
201                try {
202                    next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
203                } catch (Exception expectedIfDestinationDidNotExistYet) {
204                }
205                try {
206                    next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
207                
208                } catch (Exception expectedIfDestinationDidNotExistYet) {
209                }
210            }
211    
212        }
213    
214        @Override
215        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
216            super.removeConnection(context, info, error);
217    
218            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
219            fireAdvisory(context, topic, info.createRemoveCommand());
220            connections.remove(info.getConnectionId());
221        }
222    
223        @Override
224        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
225            super.removeConsumer(context, info);
226    
227            // Don't advise advisory topics.
228            ActiveMQDestination dest = info.getDestination();
229            if (!AdvisorySupport.isAdvisoryTopic(dest)) {
230                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
231                consumers.remove(info.getConsumerId());
232                if (!dest.isTemporary() || destinations.containsKey(dest)) {
233                    fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
234                }
235            }
236        }
237    
238        @Override
239        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
240            super.removeProducer(context, info);
241    
242            // Don't advise advisory topics.
243            ActiveMQDestination dest = info.getDestination();
244            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
245                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
246                producers.remove(info.getProducerId());
247                if (!dest.isTemporary() || destinations.contains(dest)) {
248                    fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
249                }
250            }
251        }
252    
253        @Override
254        public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
255            super.messageExpired(context, messageReference, subscription);
256            try {
257                if(!messageReference.isAdvisory()) {
258                    ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
259                    Message payload = messageReference.getMessage().copy();
260                    payload.clearBody();
261                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
262                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
263                    fireAdvisory(context, topic, payload, null, advisoryMessage);
264                }
265            } catch (Exception e) {
266                handleFireFailure("expired", e);
267            }
268        }
269        
270        @Override
271        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
272            super.messageConsumed(context, messageReference);
273            try {
274                if(!messageReference.isAdvisory()) {
275                    ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
276                    Message payload = messageReference.getMessage().copy();
277                    payload.clearBody();
278                    fireAdvisory(context, topic,payload);
279                }
280            } catch (Exception e) {
281                handleFireFailure("consumed", e);
282            }
283        }
284        
285        @Override
286        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
287            super.messageDelivered(context, messageReference);
288            try {
289                if (!messageReference.isAdvisory()) {
290                    ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
291                    Message payload = messageReference.getMessage().copy();
292                    payload.clearBody();
293                    fireAdvisory(context, topic,payload);
294                }
295            } catch (Exception e) {
296                handleFireFailure("delivered", e);
297            }
298        }
299        
300        @Override
301        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
302            super.messageDiscarded(context, sub, messageReference);
303            try {
304                if (!messageReference.isAdvisory()) {
305                    ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
306                    Message payload = messageReference.getMessage().copy();
307                    payload.clearBody();
308                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
309                    if (sub instanceof TopicSubscription) {
310                        advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
311                    }
312                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
313                    fireAdvisory(context, topic, payload, null, advisoryMessage);
314                }
315            } catch (Exception e) {
316                handleFireFailure("discarded", e);
317            }
318        }
319        
320        @Override
321        public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
322            super.slowConsumer(context, destination,subs);
323            try {
324                ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
325                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
326                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
327                fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
328            } catch (Exception e) {
329                handleFireFailure("slow consumer", e);
330            }
331        }
332        
333        @Override
334        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
335            super.fastProducer(context, producerInfo);
336            try {
337                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
338                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
339                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
340                fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
341            } catch (Exception e) {
342                handleFireFailure("fast producer", e);
343            }
344        }
345        
346        @Override
347        public void isFull(ConnectionContext context, Destination destination, Usage usage) {
348            super.isFull(context, destination, usage);
349            if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
350                try {
351    
352                    ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
353                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
354                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
355                    fireAdvisory(context, topic, null, null, advisoryMessage);
356    
357                } catch (Exception e) {
358                    handleFireFailure("is full", e);
359                }
360            }
361        }
362        
363        @Override
364        public void nowMasterBroker() {   
365            super.nowMasterBroker();
366            try {
367                ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
368                ActiveMQMessage advisoryMessage = new ActiveMQMessage();                       
369                ConnectionContext context = new ConnectionContext();
370                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
371                context.setBroker(getBrokerService().getBroker());
372                fireAdvisory(context, topic,null,null,advisoryMessage);
373            } catch (Exception e) {
374                handleFireFailure("now master broker", e);
375            }
376        }
377        
378        @Override
379        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
380                                          Subscription subscription){
381            super.sendToDeadLetterQueue(context, messageReference, subscription);
382            try {
383                if(!messageReference.isAdvisory()) {
384                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
385                    Message payload = messageReference.getMessage().copy();
386                    payload.clearBody();
387                    fireAdvisory(context, topic,payload);
388                }
389            } catch (Exception e) {
390                handleFireFailure("add to DLQ", e);
391            } 
392        }
393    
394        @Override
395        public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex) {
396            try {
397             if (brokerInfo != null) {
398                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
399                 advisoryMessage.setBooleanProperty("started", true);
400                 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
401    
402                 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
403    
404                 ConnectionContext context = new ConnectionContext();
405                 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
406                 context.setBroker(getBrokerService().getBroker());
407                 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
408             }
409            } catch (Exception e) {
410                handleFireFailure("network bridge started", e);
411            }
412        }
413    
414        @Override
415        public void networkBridgeStopped(BrokerInfo brokerInfo) {
416            try {
417             if (brokerInfo != null) {
418                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
419                 advisoryMessage.setBooleanProperty("started", false);
420    
421                 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
422    
423                 ConnectionContext context = new ConnectionContext();
424                 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
425                 context.setBroker(getBrokerService().getBroker());
426                 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
427             }
428            } catch (Exception e) {
429                handleFireFailure("network bridge stopped", e);
430            }
431        }
432    
433        private void handleFireFailure(String message, Throwable cause) {
434            LOG.warn("Failed to fire "  + message + " advisory, reason: " + cause);
435            if (LOG.isDebugEnabled()) {
436                LOG.debug(message + " detail", cause);
437            }
438        }
439    
440        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
441            fireAdvisory(context, topic, command, null);
442        }
443    
444        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
445            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
446            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
447        }
448    
449        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
450            fireConsumerAdvisory(context, consumerDestination,topic, command, null);
451        }
452    
453        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
454            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
455            int count = 0;
456            Set<Destination>set = getDestinations(consumerDestination);
457            if (set != null) {
458                for (Destination dest:set) {
459                    count += dest.getDestinationStatistics().getConsumers().getCount();
460                }
461            }
462            advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
463            
464            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
465        }
466    
467        protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
468            fireProducerAdvisory(context,producerDestination, topic, command, null);
469        }
470    
471        protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
472            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
473            int count = 0;
474            if (producerDestination != null) {
475                Set<Destination> set = getDestinations(producerDestination);
476                if (set != null) {
477                    for (Destination dest : set) {
478                        count += dest.getDestinationStatistics().getProducers().getCount();
479                    }
480                }
481            }
482            advisoryMessage.setIntProperty("producerCount", count);
483            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
484        }
485    
486        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
487            if (getBrokerService().isStarted()) {
488                //set properties
489                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
490                String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
491                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
492                
493                String url = getBrokerService().getVmConnectorURI().toString();
494                if (getBrokerService().getDefaultSocketURIString() != null) {
495                    url = getBrokerService().getDefaultSocketURIString();
496                }
497                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
498                
499                //set the data structure
500                advisoryMessage.setDataStructure(command);
501                advisoryMessage.setPersistent(false);
502                advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
503                advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
504                advisoryMessage.setTargetConsumerId(targetConsumerId);
505                advisoryMessage.setDestination(topic);
506                advisoryMessage.setResponseRequired(false);
507                advisoryMessage.setProducerId(advisoryProducerId);
508                boolean originalFlowControl = context.isProducerFlowControl();
509                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
510                producerExchange.setConnectionContext(context);
511                producerExchange.setMutable(true);
512                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
513                try {
514                    context.setProducerFlowControl(false);
515                    next.send(producerExchange, advisoryMessage);
516                } finally {
517                    context.setProducerFlowControl(originalFlowControl);
518                }
519            }
520        }
521    
522        public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
523            return connections;
524        }
525    
526        public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
527            return consumers;
528        }
529    
530        public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
531            return producers;
532        }
533    
534        public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
535            return destinations;
536        }
537    }