001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import javax.jms.Destination;
020    import javax.jms.JMSException;
021    import org.apache.activemq.ActiveMQMessageTransformation;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.ActiveMQTopic;
024    public final class AdvisorySupport {
025        public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
026        public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
027                + "Connection");
028        public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
029        public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
030        public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
031        public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
032        public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
033        public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
034        public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
035        public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
036        public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
037        public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
038        public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
039        public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
040        public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
041        public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
042        public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
043        public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
044        public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
045        public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
046        public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
047        public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
048        public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
049        public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
050        public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
051        public static final String AGENT_TOPIC = "ActiveMQ.Agent";
052        public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
053        public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
054        public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
055        public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
056        public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
057        public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
058        public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
059        public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
060        public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
061        public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
062        
063        public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
064                TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
065        private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
066    
067        private AdvisorySupport() {
068        }
069    
070        public static ActiveMQTopic getConnectionAdvisoryTopic() {
071            return CONNECTION_ADVISORY_TOPIC;
072        }
073    
074        public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
075            return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
076        }
077    
078        public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
079            if (destination.isQueue()) {
080                return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
081            } else {
082                return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
083            }
084        }
085    
086        public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
087            return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
088        }
089    
090        public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
091            if (destination.isQueue()) {
092                return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
093            } else {
094                return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
095            }
096        }
097    
098        public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
099            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
100        }
101    
102        public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
103            if (destination.isQueue()) {
104                return getExpiredQueueMessageAdvisoryTopic(destination);
105            }
106            return getExpiredTopicMessageAdvisoryTopic(destination);
107        }
108    
109        public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
110            String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
111            return new ActiveMQTopic(name);
112        }
113    
114        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
115            return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
116        }
117    
118        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
119            String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
120            return new ActiveMQTopic(name);
121        }
122    
123        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
124            return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
125        }
126    
127        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
128            String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
129            return new ActiveMQTopic(name);
130        }
131    
132        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
133            return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
134        }
135    
136        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
137            String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
138            return new ActiveMQTopic(name);
139        }
140    
141        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
142            return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
143        }
144    
145        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
146            String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
147                    + destination.getPhysicalName();
148            return new ActiveMQTopic(name);
149        }
150    
151        public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
152            return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
153        }
154    
155        public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
156            String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
157                    + destination.getPhysicalName();
158            return new ActiveMQTopic(name);
159        }
160    
161        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
162            return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
163        }
164    
165        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
166            String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
167                    + destination.getPhysicalName();
168            return new ActiveMQTopic(name);
169        }
170    
171        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
172            return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
173        }
174    
175        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
176            String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
177                    + destination.getPhysicalName();
178            return new ActiveMQTopic(name);
179        }
180    
181        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
182            return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
183        }
184    
185        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
186            String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
187                    + destination.getPhysicalName();
188            return new ActiveMQTopic(name);
189        }
190        
191        public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
192            String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
193                    + destination.getPhysicalName();
194            return new ActiveMQTopic(name);
195        }
196    
197        public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
198            return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
199        }
200    
201        public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
202            return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
203        }
204    
205        public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
206            return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
207        }
208    
209        public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
210            return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
211        }
212    
213        public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
214            String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
215                    + destination.getPhysicalName();
216            return new ActiveMQTopic(name);
217        }
218    
219        public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
220            return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
221        }
222    
223        public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
224            switch (destination.getDestinationType()) {
225            case ActiveMQDestination.QUEUE_TYPE:
226                return QUEUE_ADVISORY_TOPIC;
227            case ActiveMQDestination.TOPIC_TYPE:
228                return TOPIC_ADVISORY_TOPIC;
229            case ActiveMQDestination.TEMP_QUEUE_TYPE:
230                return TEMP_QUEUE_ADVISORY_TOPIC;
231            case ActiveMQDestination.TEMP_TOPIC_TYPE:
232                return TEMP_TOPIC_ADVISORY_TOPIC;
233            default:
234                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
235            }
236        }
237    
238        public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
239            return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
240        }
241    
242        public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
243            if (destination.isComposite()) {
244                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
245                for (int i = 0; i < compositeDestinations.length; i++) {
246                    if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
247                        return true;
248                    }
249                }
250                return false;
251            } else {
252                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
253                        || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
254            }
255        }
256    
257        public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
258            return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
259        }
260    
261        public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
262            if (destination.isComposite()) {
263                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
264                for (int i = 0; i < compositeDestinations.length; i++) {
265                    if (isAdvisoryTopic(compositeDestinations[i])) {
266                        return true;
267                    }
268                }
269                return false;
270            } else {
271                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
272            }
273        }
274    
275        public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
276            return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
277        }
278    
279        public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
280            if (destination.isComposite()) {
281                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
282                for (int i = 0; i < compositeDestinations.length; i++) {
283                    if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
284                        return true;
285                    }
286                }
287                return false;
288            } else {
289                return destination.equals(CONNECTION_ADVISORY_TOPIC);
290            }
291        }
292    
293        public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
294            return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
295        }
296    
297        public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
298            if (destination.isComposite()) {
299                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
300                for (int i = 0; i < compositeDestinations.length; i++) {
301                    if (isProducerAdvisoryTopic(compositeDestinations[i])) {
302                        return true;
303                    }
304                }
305                return false;
306            } else {
307                return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
308            }
309        }
310    
311        public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
312            return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
313        }
314    
315        public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
316            if (destination.isComposite()) {
317                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
318                for (int i = 0; i < compositeDestinations.length; i++) {
319                    if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
320                        return true;
321                    }
322                }
323                return false;
324            } else {
325                return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
326            }
327        }
328    
329        public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
330            return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
331        }
332    
333        public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
334            if (destination.isComposite()) {
335                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
336                for (int i = 0; i < compositeDestinations.length; i++) {
337                    if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
338                        return true;
339                    }
340                }
341                return false;
342            } else {
343                return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
344            }
345        }
346    
347        public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
348            return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
349        }
350    
351        public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
352            if (destination.isComposite()) {
353                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
354                for (int i = 0; i < compositeDestinations.length; i++) {
355                    if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
356                        return true;
357                    }
358                }
359                return false;
360            } else {
361                return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
362            }
363        }
364    
365        public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
366            return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
367        }
368    
369        public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
370            if (destination.isComposite()) {
371                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
372                for (int i = 0; i < compositeDestinations.length; i++) {
373                    if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
374                        return true;
375                    }
376                }
377                return false;
378            } else {
379                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
380            }
381        }
382    
383        public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
384            return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
385        }
386    
387        public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
388            if (destination.isComposite()) {
389                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
390                for (int i = 0; i < compositeDestinations.length; i++) {
391                    if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
392                        return true;
393                    }
394                }
395                return false;
396            } else {
397                return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
398            }
399        }
400    
401        public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
402            return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
403        }
404    
405        public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
406            if (destination.isComposite()) {
407                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
408                for (int i = 0; i < compositeDestinations.length; i++) {
409                    if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
410                        return true;
411                    }
412                }
413                return false;
414            } else {
415                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
416            }
417        }
418    
419        public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
420            return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
421        }
422    
423        public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
424            if (destination.isComposite()) {
425                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
426                for (int i = 0; i < compositeDestinations.length; i++) {
427                    if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
428                        return true;
429                    }
430                }
431                return false;
432            } else {
433                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
434            }
435        }
436    
437        public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
438            return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
439        }
440    
441        public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
442            if (destination.isComposite()) {
443                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
444                for (int i = 0; i < compositeDestinations.length; i++) {
445                    if (isFullAdvisoryTopic(compositeDestinations[i])) {
446                        return true;
447                    }
448                }
449                return false;
450            } else {
451                return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
452            }
453        }
454    
455        /**
456         * Returns the agent topic which is used to send commands to the broker
457         */
458        public static Destination getAgentDestination() {
459            return AGENT_TOPIC_DESTINATION;
460        }
461    }