001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Hashtable; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.Map.Entry; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.CopyOnWriteArraySet; 030 import java.util.concurrent.ThreadPoolExecutor; 031 import javax.management.InstanceNotFoundException; 032 import javax.management.MalformedObjectNameException; 033 import javax.management.ObjectName; 034 import javax.management.openmbean.CompositeData; 035 import javax.management.openmbean.CompositeDataSupport; 036 import javax.management.openmbean.CompositeType; 037 import javax.management.openmbean.OpenDataException; 038 import javax.management.openmbean.TabularData; 039 import javax.management.openmbean.TabularDataSupport; 040 import javax.management.openmbean.TabularType; 041 import org.apache.activemq.broker.Broker; 042 import org.apache.activemq.broker.BrokerService; 043 import org.apache.activemq.broker.ConnectionContext; 044 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 045 import org.apache.activemq.broker.region.Destination; 046 import org.apache.activemq.broker.region.DestinationFactory; 047 import org.apache.activemq.broker.region.DestinationFactoryImpl; 048 import org.apache.activemq.broker.region.DestinationInterceptor; 049 import org.apache.activemq.broker.region.Queue; 050 import org.apache.activemq.broker.region.Region; 051 import org.apache.activemq.broker.region.RegionBroker; 052 import org.apache.activemq.broker.region.Subscription; 053 import org.apache.activemq.broker.region.Topic; 054 import org.apache.activemq.broker.region.TopicRegion; 055 import org.apache.activemq.broker.region.TopicSubscription; 056 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 057 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 058 import org.apache.activemq.command.ActiveMQDestination; 059 import org.apache.activemq.command.ActiveMQMessage; 060 import org.apache.activemq.command.ActiveMQTopic; 061 import org.apache.activemq.command.ConsumerInfo; 062 import org.apache.activemq.command.Message; 063 import org.apache.activemq.command.MessageId; 064 import org.apache.activemq.command.SubscriptionInfo; 065 import org.apache.activemq.store.MessageRecoveryListener; 066 import org.apache.activemq.store.PersistenceAdapter; 067 import org.apache.activemq.store.TopicMessageStore; 068 import org.apache.activemq.thread.Scheduler; 069 import org.apache.activemq.thread.TaskRunnerFactory; 070 import org.apache.activemq.usage.SystemUsage; 071 import org.apache.activemq.util.JMXSupport; 072 import org.apache.activemq.util.ServiceStopper; 073 import org.apache.activemq.util.SubscriptionKey; 074 import org.slf4j.Logger; 075 import org.slf4j.LoggerFactory; 076 077 public class ManagedRegionBroker extends RegionBroker { 078 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); 079 private final ManagementContext managementContext; 080 private final ObjectName brokerObjectName; 081 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 082 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 083 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 084 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 085 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 086 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 087 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 088 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 089 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 090 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 091 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 092 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 093 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 094 /* This is the first broker in the broker interceptor chain. */ 095 private Broker contextBroker; 096 097 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 098 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 099 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); 100 this.managementContext = context; 101 this.brokerObjectName = brokerObjectName; 102 } 103 104 @Override 105 public void start() throws Exception { 106 super.start(); 107 // build all existing durable subscriptions 108 buildExistingSubscriptions(); 109 } 110 111 @Override 112 protected void doStop(ServiceStopper stopper) { 113 super.doStop(stopper); 114 // lets remove any mbeans not yet removed 115 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 116 ObjectName name = iter.next(); 117 try { 118 managementContext.unregisterMBean(name); 119 } catch (InstanceNotFoundException e) { 120 LOG.warn("The MBean: " + name + " is no longer registered with JMX"); 121 } catch (Exception e) { 122 stopper.onException(this, e); 123 } 124 } 125 registeredMBeans.clear(); 126 } 127 128 @Override 129 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 130 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 131 } 132 133 @Override 134 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 135 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 136 } 137 138 @Override 139 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 140 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 141 } 142 143 @Override 144 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 145 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 146 } 147 148 public void register(ActiveMQDestination destName, Destination destination) { 149 // TODO refactor to allow views for custom destinations 150 try { 151 ObjectName objectName = createObjectName(destName); 152 DestinationView view; 153 if (destination instanceof Queue) { 154 view = new QueueView(this, (Queue)destination); 155 } else if (destination instanceof Topic) { 156 view = new TopicView(this, (Topic)destination); 157 } else { 158 view = null; 159 LOG.warn("JMX View is not supported for custom destination: " + destination); 160 } 161 if (view != null) { 162 registerDestination(objectName, destName, view); 163 } 164 } catch (Exception e) { 165 LOG.error("Failed to register destination " + destName, e); 166 } 167 } 168 169 public void unregister(ActiveMQDestination destName) { 170 try { 171 ObjectName objectName = createObjectName(destName); 172 unregisterDestination(objectName); 173 } catch (Exception e) { 174 LOG.error("Failed to unregister " + destName, e); 175 } 176 } 177 178 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 179 String connectionClientId = context.getClientId(); 180 ObjectName brokerJmxObjectName = brokerObjectName; 181 String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName); 182 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 183 try { 184 ObjectName objectName = new ObjectName(objectNameStr); 185 SubscriptionView view; 186 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) { 187 // add offline subscribers to inactive list 188 SubscriptionInfo info = new SubscriptionInfo(); 189 info.setClientId(context.getClientId()); 190 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); 191 info.setDestination(sub.getConsumerInfo().getDestination()); 192 addInactiveSubscription(key, info); 193 } else { 194 if (sub.getConsumerInfo().isDurable()) { 195 view = new DurableSubscriptionView(this, context.getClientId(), sub); 196 } else { 197 if (sub instanceof TopicSubscription) { 198 view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub); 199 } else { 200 view = new SubscriptionView(context.getClientId(), sub); 201 } 202 } 203 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 204 } 205 subscriptionMap.put(sub, objectName); 206 return objectName; 207 } catch (Exception e) { 208 LOG.error("Failed to register subscription " + sub, e); 209 return null; 210 } 211 } 212 213 public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) { 214 Hashtable map = brokerJmxObjectName.getKeyPropertyList(); 215 String brokerDomain = brokerJmxObjectName.getDomain(); 216 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; 217 String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); 218 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); 219 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); 220 String persistentMode = "persistentMode="; 221 String consumerId = ""; 222 if (sub.getConsumerInfo().isDurable()) { 223 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); 224 } else { 225 persistentMode += "Non-Durable"; 226 if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) { 227 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); 228 } 229 } 230 objectNameStr += persistentMode + ","; 231 objectNameStr += destinationType + ","; 232 objectNameStr += destinationName + ","; 233 objectNameStr += clientId; 234 objectNameStr += consumerId; 235 return objectNameStr; 236 } 237 238 @Override 239 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 240 Subscription sub = super.addConsumer(context, info); 241 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 242 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 243 if (inactiveName != null) { 244 // if it was inactive, register it 245 registerSubscription(context, sub); 246 } 247 return sub; 248 } 249 250 @Override 251 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 252 for (Subscription sub : subscriptionMap.keySet()) { 253 if (sub.getConsumerInfo().equals(info)) { 254 // unregister all consumer subs 255 unregisterSubscription(subscriptionMap.get(sub), true); 256 } 257 } 258 super.removeConsumer(context, info); 259 } 260 261 public void unregisterSubscription(Subscription sub) { 262 ObjectName name = subscriptionMap.remove(sub); 263 if (name != null) { 264 try { 265 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 266 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 267 if (inactiveName != null) { 268 inactiveDurableTopicSubscribers.remove(inactiveName); 269 managementContext.unregisterMBean(inactiveName); 270 } 271 } catch (Exception e) { 272 LOG.error("Failed to unregister subscription " + sub, e); 273 } 274 } 275 } 276 277 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 278 if (dest.isQueue()) { 279 if (dest.isTemporary()) { 280 temporaryQueues.put(key, view); 281 } else { 282 queues.put(key, view); 283 } 284 } else { 285 if (dest.isTemporary()) { 286 temporaryTopics.put(key, view); 287 } else { 288 topics.put(key, view); 289 } 290 } 291 try { 292 AnnotatedMBean.registerMBean(managementContext, view, key); 293 registeredMBeans.add(key); 294 } catch (Throwable e) { 295 LOG.warn("Failed to register MBean: " + key); 296 LOG.debug("Failure reason: " + e, e); 297 } 298 } 299 300 protected void unregisterDestination(ObjectName key) throws Exception { 301 302 DestinationView view = null; 303 removeAndRemember(topics, key, view); 304 removeAndRemember(queues, key, view); 305 removeAndRemember(temporaryQueues, key, view); 306 removeAndRemember(temporaryTopics, key, view); 307 if (registeredMBeans.remove(key)) { 308 try { 309 managementContext.unregisterMBean(key); 310 } catch (Throwable e) { 311 LOG.warn("Failed to unregister MBean: " + key); 312 LOG.debug("Failure reason: " + e, e); 313 } 314 } 315 if (view != null) { 316 key = view.getSlowConsumerStrategy(); 317 if (key!= null && registeredMBeans.remove(key)) { 318 try { 319 managementContext.unregisterMBean(key); 320 } catch (Throwable e) { 321 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key); 322 LOG.debug("Failure reason: " + e, e); 323 } 324 } 325 } 326 } 327 328 private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) { 329 DestinationView candidate = map.remove(key); 330 if (candidate != null && view == null) { 331 view = candidate; 332 } 333 } 334 335 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 336 ActiveMQDestination dest = info.getDestination(); 337 if (dest.isQueue()) { 338 if (dest.isTemporary()) { 339 temporaryQueueSubscribers.put(key, view); 340 } else { 341 queueSubscribers.put(key, view); 342 } 343 } else { 344 if (dest.isTemporary()) { 345 temporaryTopicSubscribers.put(key, view); 346 } else { 347 if (info.isDurable()) { 348 durableTopicSubscribers.put(key, view); 349 // unregister any inactive durable subs 350 try { 351 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 352 if (inactiveName != null) { 353 inactiveDurableTopicSubscribers.remove(inactiveName); 354 registeredMBeans.remove(inactiveName); 355 managementContext.unregisterMBean(inactiveName); 356 } 357 } catch (Throwable e) { 358 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e); 359 } 360 } else { 361 topicSubscribers.put(key, view); 362 } 363 } 364 } 365 366 try { 367 AnnotatedMBean.registerMBean(managementContext, view, key); 368 registeredMBeans.add(key); 369 } catch (Throwable e) { 370 LOG.warn("Failed to register MBean: " + key); 371 LOG.debug("Failure reason: " + e, e); 372 } 373 374 } 375 376 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception { 377 queueSubscribers.remove(key); 378 topicSubscribers.remove(key); 379 temporaryQueueSubscribers.remove(key); 380 temporaryTopicSubscribers.remove(key); 381 if (registeredMBeans.remove(key)) { 382 try { 383 managementContext.unregisterMBean(key); 384 } catch (Throwable e) { 385 LOG.warn("Failed to unregister MBean: " + key); 386 LOG.debug("Failure reason: " + e, e); 387 } 388 } 389 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 390 if (view != null) { 391 // need to put this back in the inactive list 392 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 393 if (addToInactive) { 394 SubscriptionInfo info = new SubscriptionInfo(); 395 info.setClientId(subscriptionKey.getClientId()); 396 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 397 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 398 addInactiveSubscription(subscriptionKey, info); 399 } 400 } 401 } 402 403 protected void buildExistingSubscriptions() throws Exception { 404 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 405 Set destinations = destinationFactory.getDestinations(); 406 if (destinations != null) { 407 for (Iterator iter = destinations.iterator(); iter.hasNext();) { 408 ActiveMQDestination dest = (ActiveMQDestination)iter.next(); 409 if (dest.isTopic()) { 410 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 411 if (infos != null) { 412 for (int i = 0; i < infos.length; i++) { 413 SubscriptionInfo info = infos[i]; 414 SubscriptionKey key = new SubscriptionKey(info); 415 if (!alreadyKnown(key)) { 416 LOG.debug("Restoring durable subscription mbean: " + info); 417 subscriptions.put(key, info); 418 } 419 } 420 } 421 } 422 } 423 } 424 for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) { 425 Map.Entry entry = (Entry)i.next(); 426 SubscriptionKey key = (SubscriptionKey)entry.getKey(); 427 SubscriptionInfo info = (SubscriptionInfo)entry.getValue(); 428 addInactiveSubscription(key, info); 429 } 430 } 431 432 private boolean alreadyKnown(SubscriptionKey key) { 433 boolean known = false; 434 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key); 435 if (LOG.isTraceEnabled()) { 436 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered"); 437 } 438 return known; 439 } 440 441 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) { 442 Hashtable map = brokerObjectName.getKeyPropertyList(); 443 try { 444 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false," 445 + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + ""); 446 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info); 447 448 try { 449 AnnotatedMBean.registerMBean(managementContext, view, objectName); 450 registeredMBeans.add(objectName); 451 } catch (Throwable e) { 452 LOG.warn("Failed to register MBean: " + key); 453 LOG.debug("Failure reason: " + e, e); 454 } 455 456 inactiveDurableTopicSubscribers.put(objectName, view); 457 subscriptionKeys.put(key, objectName); 458 } catch (Exception e) { 459 LOG.error("Failed to register subscription " + info, e); 460 } 461 } 462 463 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 464 List<Message> messages = getSubscriberMessages(view); 465 CompositeData c[] = new CompositeData[messages.size()]; 466 for (int i = 0; i < c.length; i++) { 467 try { 468 c[i] = OpenTypeSupport.convert(messages.get(i)); 469 } catch (Throwable e) { 470 LOG.error("failed to browse : " + view, e); 471 } 472 } 473 return c; 474 } 475 476 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 477 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 478 List<Message> messages = getSubscriberMessages(view); 479 CompositeType ct = factory.getCompositeType(); 480 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 481 TabularDataSupport rc = new TabularDataSupport(tt); 482 for (int i = 0; i < messages.size(); i++) { 483 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i)))); 484 } 485 return rc; 486 } 487 488 protected List<Message> getSubscriberMessages(SubscriptionView view) { 489 // TODO It is very dangerous operation for big backlogs 490 if (!(destinationFactory instanceof DestinationFactoryImpl)) { 491 throw new RuntimeException("unsupported by " + destinationFactory); 492 } 493 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 494 final List<Message> result = new ArrayList<Message>(); 495 try { 496 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); 497 TopicMessageStore store = adapter.createTopicMessageStore(topic); 498 store.recover(new MessageRecoveryListener() { 499 public boolean recoverMessage(Message message) throws Exception { 500 result.add(message); 501 return true; 502 } 503 504 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 505 throw new RuntimeException("Should not be called."); 506 } 507 508 public boolean hasSpace() { 509 return true; 510 } 511 512 public boolean isDuplicate(MessageId id) { 513 return false; 514 } 515 }); 516 } catch (Throwable e) { 517 LOG.error("Failed to browse messages for Subscription " + view, e); 518 } 519 return result; 520 521 } 522 523 protected ObjectName[] getTopics() { 524 Set<ObjectName> set = topics.keySet(); 525 return set.toArray(new ObjectName[set.size()]); 526 } 527 528 protected ObjectName[] getQueues() { 529 Set<ObjectName> set = queues.keySet(); 530 return set.toArray(new ObjectName[set.size()]); 531 } 532 533 protected ObjectName[] getTemporaryTopics() { 534 Set<ObjectName> set = temporaryTopics.keySet(); 535 return set.toArray(new ObjectName[set.size()]); 536 } 537 538 protected ObjectName[] getTemporaryQueues() { 539 Set<ObjectName> set = temporaryQueues.keySet(); 540 return set.toArray(new ObjectName[set.size()]); 541 } 542 543 protected ObjectName[] getTopicSubscribers() { 544 Set<ObjectName> set = topicSubscribers.keySet(); 545 return set.toArray(new ObjectName[set.size()]); 546 } 547 548 protected ObjectName[] getDurableTopicSubscribers() { 549 Set<ObjectName> set = durableTopicSubscribers.keySet(); 550 return set.toArray(new ObjectName[set.size()]); 551 } 552 553 protected ObjectName[] getQueueSubscribers() { 554 Set<ObjectName> set = queueSubscribers.keySet(); 555 return set.toArray(new ObjectName[set.size()]); 556 } 557 558 protected ObjectName[] getTemporaryTopicSubscribers() { 559 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 560 return set.toArray(new ObjectName[set.size()]); 561 } 562 563 protected ObjectName[] getTemporaryQueueSubscribers() { 564 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 565 return set.toArray(new ObjectName[set.size()]); 566 } 567 568 protected ObjectName[] getInactiveDurableTopicSubscribers() { 569 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 570 return set.toArray(new ObjectName[set.size()]); 571 } 572 573 public Broker getContextBroker() { 574 return contextBroker; 575 } 576 577 public void setContextBroker(Broker contextBroker) { 578 this.contextBroker = contextBroker; 579 } 580 581 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { 582 // Build the object name for the destination 583 Hashtable map = brokerObjectName.getKeyPropertyList(); 584 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=" 585 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination=" 586 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); 587 return objectName; 588 } 589 590 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { 591 ObjectName objectName = null; 592 try { 593 objectName = createObjectName(strategy); 594 if (!registeredMBeans.contains(objectName)) { 595 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy); 596 AnnotatedMBean.registerMBean(managementContext, view, objectName); 597 registeredMBeans.add(objectName); 598 } 599 } catch (Exception e) { 600 LOG.warn("Failed to register MBean: " + strategy); 601 LOG.debug("Failure reason: " + e, e); 602 } 603 return objectName; 604 } 605 606 private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{ 607 Hashtable map = brokerObjectName.getKeyPropertyList(); 608 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," 609 + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName())); 610 return objectName; 611 } 612 613 public ObjectName getSubscriberObjectName(Subscription key) { 614 return subscriptionMap.get(key); 615 } 616 617 public Subscription getSubscriber(ObjectName key) { 618 Subscription sub = null; 619 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) { 620 if (entry.getValue().equals(key)) { 621 sub = entry.getKey(); 622 break; 623 } 624 } 625 return sub; 626 } 627 }