001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.LinkedList; 022 import java.util.List; 023 import java.util.Set; 024 import java.util.concurrent.CancellationException; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.CopyOnWriteArrayList; 027 import java.util.concurrent.CopyOnWriteArraySet; 028 import java.util.concurrent.Future; 029 import org.apache.activemq.broker.BrokerService; 030 import org.apache.activemq.broker.ConnectionContext; 031 import org.apache.activemq.broker.ProducerBrokerExchange; 032 import org.apache.activemq.broker.region.policy.DispatchPolicy; 033 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; 034 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 035 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 036 import org.apache.activemq.command.ActiveMQDestination; 037 import org.apache.activemq.command.ExceptionResponse; 038 import org.apache.activemq.command.Message; 039 import org.apache.activemq.command.MessageAck; 040 import org.apache.activemq.command.MessageId; 041 import org.apache.activemq.command.ProducerAck; 042 import org.apache.activemq.command.ProducerInfo; 043 import org.apache.activemq.command.Response; 044 import org.apache.activemq.command.SubscriptionInfo; 045 import org.apache.activemq.filter.MessageEvaluationContext; 046 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 047 import org.apache.activemq.store.MessageRecoveryListener; 048 import org.apache.activemq.store.TopicMessageStore; 049 import org.apache.activemq.thread.Task; 050 import org.apache.activemq.thread.TaskRunner; 051 import org.apache.activemq.thread.TaskRunnerFactory; 052 import org.apache.activemq.thread.Valve; 053 import org.apache.activemq.transaction.Synchronization; 054 import org.apache.activemq.util.SubscriptionKey; 055 import org.slf4j.Logger; 056 import org.slf4j.LoggerFactory; 057 058 /** 059 * The Topic is a destination that sends a copy of a message to every active 060 * Subscription registered. 061 * 062 * 063 */ 064 public class Topic extends BaseDestination implements Task { 065 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 066 private final TopicMessageStore topicStore; 067 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 068 protected final Valve dispatchValve = new Valve(true); 069 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 070 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 071 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 072 private final TaskRunner taskRunner; 073 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 074 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 075 public void run() { 076 try { 077 Topic.this.taskRunner.wakeup(); 078 } catch (InterruptedException e) { 079 } 080 }; 081 }; 082 083 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 084 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 085 super(brokerService, store, destination, parentStats); 086 this.topicStore = store; 087 // set default subscription recovery policy 088 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); 089 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 090 } 091 092 @Override 093 public void initialize() throws Exception { 094 super.initialize(); 095 if (store != null) { 096 // AMQ-2586: Better to leave this stat at zero than to give the user 097 // misleading metrics. 098 // int messageCount = store.getMessageCount(); 099 // destinationStatistics.getMessages().setCount(messageCount); 100 } 101 } 102 103 public List<Subscription> getConsumers() { 104 synchronized (consumers) { 105 return new ArrayList<Subscription>(consumers); 106 } 107 } 108 109 public boolean lock(MessageReference node, LockOwner sub) { 110 return true; 111 } 112 113 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 114 115 super.addSubscription(context, sub); 116 117 if (!sub.getConsumerInfo().isDurable()) { 118 119 // Do a retroactive recovery if needed. 120 if (sub.getConsumerInfo().isRetroactive()) { 121 122 // synchronize with dispatch method so that no new messages are 123 // sent 124 // while we are recovering a subscription to avoid out of order 125 // messages. 126 dispatchValve.turnOff(); 127 try { 128 129 synchronized (consumers) { 130 sub.add(context, this); 131 consumers.add(sub); 132 } 133 subscriptionRecoveryPolicy.recover(context, this, sub); 134 135 } finally { 136 dispatchValve.turnOn(); 137 } 138 139 } else { 140 synchronized (consumers) { 141 sub.add(context, this); 142 consumers.add(sub); 143 } 144 } 145 } else { 146 sub.add(context, this); 147 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 148 durableSubcribers.put(dsub.getSubscriptionKey(), dsub); 149 } 150 } 151 152 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) 153 throws Exception { 154 if (!sub.getConsumerInfo().isDurable()) { 155 super.removeSubscription(context, sub, lastDeliveredSequenceId); 156 synchronized (consumers) { 157 consumers.remove(sub); 158 } 159 } 160 sub.remove(context, this); 161 } 162 163 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 164 if (topicStore != null) { 165 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 166 DurableTopicSubscription removed = durableSubcribers.remove(key); 167 if (removed != null) { 168 destinationStatistics.getConsumers().decrement(); 169 // deactivate and remove 170 removed.deactivate(false); 171 consumers.remove(removed); 172 } 173 } 174 } 175 176 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 177 // synchronize with dispatch method so that no new messages are sent 178 // while 179 // we are recovering a subscription to avoid out of order messages. 180 dispatchValve.turnOff(); 181 try { 182 183 if (topicStore == null) { 184 return; 185 } 186 187 // Recover the durable subscription. 188 String clientId = subscription.getSubscriptionKey().getClientId(); 189 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 190 String selector = subscription.getConsumerInfo().getSelector(); 191 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 192 if (info != null) { 193 // Check to see if selector changed. 194 String s1 = info.getSelector(); 195 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 196 // Need to delete the subscription 197 topicStore.deleteSubscription(clientId, subscriptionName); 198 info = null; 199 } else { 200 synchronized (consumers) { 201 consumers.add(subscription); 202 } 203 } 204 } 205 // Do we need to create the subscription? 206 if (info == null) { 207 info = new SubscriptionInfo(); 208 info.setClientId(clientId); 209 info.setSelector(selector); 210 info.setSubscriptionName(subscriptionName); 211 info.setDestination(getActiveMQDestination()); 212 // This destination is an actual destination id. 213 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 214 // This destination might be a pattern 215 synchronized (consumers) { 216 consumers.add(subscription); 217 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive()); 218 } 219 } 220 221 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 222 msgContext.setDestination(destination); 223 if (subscription.isRecoveryRequired()) { 224 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 225 public boolean recoverMessage(Message message) throws Exception { 226 message.setRegionDestination(Topic.this); 227 try { 228 msgContext.setMessageReference(message); 229 if (subscription.matches(message, msgContext)) { 230 subscription.add(message); 231 } 232 } catch (IOException e) { 233 LOG.error("Failed to recover this message " + message); 234 } 235 return true; 236 } 237 238 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 239 throw new RuntimeException("Should not be called."); 240 } 241 242 public boolean hasSpace() { 243 return true; 244 } 245 246 public boolean isDuplicate(MessageId id) { 247 return false; 248 } 249 }); 250 } 251 } finally { 252 dispatchValve.turnOn(); 253 } 254 } 255 256 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception { 257 synchronized (consumers) { 258 consumers.remove(sub); 259 } 260 sub.remove(context, this); 261 } 262 263 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 264 if (subscription.getConsumerInfo().isRetroactive()) { 265 subscriptionRecoveryPolicy.recover(context, this, subscription); 266 } 267 } 268 269 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 270 final ConnectionContext context = producerExchange.getConnectionContext(); 271 272 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 273 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 274 && !context.isInRecoveryMode(); 275 276 // There is delay between the client sending it and it arriving at the 277 // destination.. it may have expired. 278 if (message.isExpired()) { 279 broker.messageExpired(context, message, null); 280 getDestinationStatistics().getExpired().increment(); 281 if (sendProducerAck) { 282 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 283 context.getConnection().dispatchAsync(ack); 284 } 285 return; 286 } 287 288 if (memoryUsage.isFull()) { 289 isFull(context, memoryUsage); 290 fastProducer(context, producerInfo); 291 292 if (isProducerFlowControl() && context.isProducerFlowControl()) { 293 294 if (warnOnProducerFlowControl) { 295 warnOnProducerFlowControl = false; 296 LOG 297 .info("Usage Manager memory limit (" 298 + memoryUsage.getLimit() 299 + ") reached for " 300 + getActiveMQDestination().getQualifiedName() 301 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." 302 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 303 } 304 305 if (systemUsage.isSendFailIfNoSpace()) { 306 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 307 + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() 308 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 309 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 310 } 311 312 // We can avoid blocking due to low usage if the producer is 313 // sending 314 // a sync message or 315 // if it is using a producer window 316 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 317 synchronized (messagesWaitingForSpace) { 318 messagesWaitingForSpace.add(new Runnable() { 319 public void run() { 320 try { 321 322 // While waiting for space to free up... the 323 // message may have expired. 324 if (message.isExpired()) { 325 broker.messageExpired(context, message, null); 326 getDestinationStatistics().getExpired().increment(); 327 } else { 328 doMessageSend(producerExchange, message); 329 } 330 331 if (sendProducerAck) { 332 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 333 .getSize()); 334 context.getConnection().dispatchAsync(ack); 335 } else { 336 Response response = new Response(); 337 response.setCorrelationId(message.getCommandId()); 338 context.getConnection().dispatchAsync(response); 339 } 340 341 } catch (Exception e) { 342 if (!sendProducerAck && !context.isInRecoveryMode()) { 343 ExceptionResponse response = new ExceptionResponse(e); 344 response.setCorrelationId(message.getCommandId()); 345 context.getConnection().dispatchAsync(response); 346 } 347 } 348 349 } 350 }); 351 352 registerCallbackForNotFullNotification(); 353 context.setDontSendReponse(true); 354 return; 355 } 356 357 } else { 358 // Producer flow control cannot be used, so we have do the 359 // flow 360 // control at the broker 361 // by blocking this thread until there is space available. 362 363 if (memoryUsage.isFull()) { 364 if (context.isInTransaction()) { 365 366 int count = 0; 367 while (!memoryUsage.waitForSpace(1000)) { 368 if (context.getStopping().get()) { 369 throw new IOException("Connection closed, send aborted."); 370 } 371 if (count > 2 && context.isInTransaction()) { 372 count = 0; 373 int size = context.getTransaction().size(); 374 LOG.warn("Waiting for space to send transacted message - transaction elements = " 375 + size + " need more space to commit. Message = " + message); 376 } 377 } 378 } else { 379 waitForSpace( 380 context, 381 memoryUsage, 382 "Usage Manager memory limit reached. Stopping producer (" 383 + message.getProducerId() 384 + ") to prevent flooding " 385 + getActiveMQDestination().getQualifiedName() 386 + "." 387 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 388 } 389 } 390 391 // The usage manager could have delayed us by the time 392 // we unblock the message could have expired.. 393 if (message.isExpired()) { 394 getDestinationStatistics().getExpired().increment(); 395 if (LOG.isDebugEnabled()) { 396 LOG.debug("Expired message: " + message); 397 } 398 return; 399 } 400 } 401 } 402 } 403 404 doMessageSend(producerExchange, message); 405 messageDelivered(context, message); 406 if (sendProducerAck) { 407 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 408 context.getConnection().dispatchAsync(ack); 409 } 410 } 411 412 /** 413 * do send the message - this needs to be synchronized to ensure messages 414 * are stored AND dispatched in the right order 415 * 416 * @param producerExchange 417 * @param message 418 * @throws IOException 419 * @throws Exception 420 */ 421 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 422 throws IOException, Exception { 423 final ConnectionContext context = producerExchange.getConnectionContext(); 424 message.setRegionDestination(this); 425 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 426 Future<Object> result = null; 427 428 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 429 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 430 final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " 431 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 432 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 433 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 434 if (systemUsage.isSendFailIfNoSpace()) { 435 throw new javax.jms.ResourceAllocationException(logMessage); 436 } 437 438 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 439 } 440 result = topicStore.asyncAddTopicMessage(context, message); 441 } 442 443 message.incrementReferenceCount(); 444 445 if (context.isInTransaction()) { 446 context.getTransaction().addSynchronization(new Synchronization() { 447 @Override 448 public void afterCommit() throws Exception { 449 // It could take while before we receive the commit 450 // operration.. by that time the message could have 451 // expired.. 452 if (broker.isExpired(message)) { 453 getDestinationStatistics().getExpired().increment(); 454 broker.messageExpired(context, message, null); 455 message.decrementReferenceCount(); 456 return; 457 } 458 try { 459 dispatch(context, message); 460 } finally { 461 message.decrementReferenceCount(); 462 } 463 } 464 }); 465 466 } else { 467 try { 468 dispatch(context, message); 469 } finally { 470 message.decrementReferenceCount(); 471 } 472 } 473 if (result != null && !result.isCancelled()) { 474 try { 475 result.get(); 476 } catch (CancellationException e) { 477 // ignore - the task has been cancelled if the message 478 // has already been deleted 479 } 480 } 481 482 } 483 484 private boolean canOptimizeOutPersistence() { 485 return durableSubcribers.size() == 0; 486 } 487 488 @Override 489 public String toString() { 490 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 491 } 492 493 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 494 final MessageReference node) throws IOException { 495 if (topicStore != null && node.isPersistent()) { 496 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 497 SubscriptionKey key = dsub.getSubscriptionKey(); 498 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack); 499 } 500 messageConsumed(context, node); 501 } 502 503 public void gc() { 504 } 505 506 public Message loadMessage(MessageId messageId) throws IOException { 507 return topicStore != null ? topicStore.getMessage(messageId) : null; 508 } 509 510 public void start() throws Exception { 511 this.subscriptionRecoveryPolicy.start(); 512 if (memoryUsage != null) { 513 memoryUsage.start(); 514 } 515 516 } 517 518 public void stop() throws Exception { 519 if (taskRunner != null) { 520 taskRunner.shutdown(); 521 } 522 this.subscriptionRecoveryPolicy.stop(); 523 if (memoryUsage != null) { 524 memoryUsage.stop(); 525 } 526 if (this.topicStore != null) { 527 this.topicStore.stop(); 528 } 529 } 530 531 public Message[] browse() { 532 final Set<Message> result = new CopyOnWriteArraySet<Message>(); 533 try { 534 if (topicStore != null) { 535 topicStore.recover(new MessageRecoveryListener() { 536 public boolean recoverMessage(Message message) throws Exception { 537 result.add(message); 538 return true; 539 } 540 541 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 542 return true; 543 } 544 545 public boolean hasSpace() { 546 return true; 547 } 548 549 public boolean isDuplicate(MessageId id) { 550 return false; 551 } 552 }); 553 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 554 if (msgs != null) { 555 for (int i = 0; i < msgs.length; i++) { 556 result.add(msgs[i]); 557 } 558 } 559 } 560 } catch (Throwable e) { 561 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e); 562 } 563 return result.toArray(new Message[result.size()]); 564 } 565 566 public boolean iterate() { 567 synchronized (messagesWaitingForSpace) { 568 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 569 Runnable op = messagesWaitingForSpace.removeFirst(); 570 op.run(); 571 } 572 573 if (!messagesWaitingForSpace.isEmpty()) { 574 registerCallbackForNotFullNotification(); 575 } 576 } 577 return false; 578 } 579 580 private void registerCallbackForNotFullNotification() { 581 // If the usage manager is not full, then the task will not 582 // get called.. 583 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 584 // so call it directly here. 585 sendMessagesWaitingForSpaceTask.run(); 586 } 587 } 588 589 // Properties 590 // ------------------------------------------------------------------------- 591 592 public DispatchPolicy getDispatchPolicy() { 593 return dispatchPolicy; 594 } 595 596 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 597 this.dispatchPolicy = dispatchPolicy; 598 } 599 600 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 601 return subscriptionRecoveryPolicy; 602 } 603 604 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 605 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 606 } 607 608 // Implementation methods 609 // ------------------------------------------------------------------------- 610 611 public final void wakeup() { 612 } 613 614 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 615 // AMQ-2586: Better to leave this stat at zero than to give the user 616 // misleading metrics. 617 // destinationStatistics.getMessages().increment(); 618 destinationStatistics.getEnqueues().increment(); 619 dispatchValve.increment(); 620 MessageEvaluationContext msgContext = null; 621 try { 622 if (!subscriptionRecoveryPolicy.add(context, message)) { 623 return; 624 } 625 synchronized (consumers) { 626 if (consumers.isEmpty()) { 627 onMessageWithNoConsumers(context, message); 628 return; 629 } 630 } 631 msgContext = context.getMessageEvaluationContext(); 632 msgContext.setDestination(destination); 633 msgContext.setMessageReference(message); 634 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 635 onMessageWithNoConsumers(context, message); 636 } 637 638 } finally { 639 dispatchValve.decrement(); 640 if (msgContext != null) { 641 msgContext.clear(); 642 } 643 } 644 } 645 646 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 647 broker.messageExpired(context, reference, subs); 648 // AMQ-2586: Better to leave this stat at zero than to give the user 649 // misleading metrics. 650 // destinationStatistics.getMessages().decrement(); 651 destinationStatistics.getEnqueues().decrement(); 652 destinationStatistics.getExpired().increment(); 653 MessageAck ack = new MessageAck(); 654 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 655 ack.setDestination(destination); 656 ack.setMessageID(reference.getMessageId()); 657 try { 658 acknowledge(context, subs, ack, reference); 659 } catch (IOException e) { 660 LOG.error("Failed to remove expired Message from the store ", e); 661 } 662 } 663 664 @Override 665 protected Logger getLog() { 666 return LOG; 667 } 668 669 }