001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 import java.util.concurrent.CountDownLatch; 025 import java.util.concurrent.TimeUnit; 026 import javax.jms.InvalidSelectorException; 027 import javax.jms.JMSException; 028 import org.apache.activemq.broker.Broker; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 031 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 032 import org.apache.activemq.command.ActiveMQMessage; 033 import org.apache.activemq.command.ConsumerControl; 034 import org.apache.activemq.command.ConsumerInfo; 035 import org.apache.activemq.command.Message; 036 import org.apache.activemq.command.MessageAck; 037 import org.apache.activemq.command.MessageDispatch; 038 import org.apache.activemq.command.MessageDispatchNotification; 039 import org.apache.activemq.command.MessageId; 040 import org.apache.activemq.command.MessagePull; 041 import org.apache.activemq.command.Response; 042 import org.apache.activemq.thread.Scheduler; 043 import org.apache.activemq.transaction.Synchronization; 044 import org.apache.activemq.usage.SystemUsage; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 /** 049 * A subscription that honors the pre-fetch option of the ConsumerInfo. 050 * 051 * 052 */ 053 public abstract class PrefetchSubscription extends AbstractSubscription { 054 055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 056 protected final Scheduler scheduler; 057 058 protected PendingMessageCursor pending; 059 protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>(); 060 protected int prefetchExtension; 061 protected boolean usePrefetchExtension = true; 062 protected long enqueueCounter; 063 protected long dispatchCounter; 064 protected long dequeueCounter; 065 private int maxProducersToAudit=32; 066 private int maxAuditDepth=2048; 067 protected final SystemUsage usageManager; 068 protected final Object pendingLock = new Object(); 069 private final Object dispatchLock = new Object(); 070 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 071 072 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { 073 super(broker,context, info); 074 this.usageManager=usageManager; 075 pending = cursor; 076 this.scheduler = broker.getScheduler(); 077 } 078 079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 081 } 082 083 /** 084 * Allows a message to be pulled on demand by a client 085 */ 086 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 087 // The slave should not deliver pull messages. TODO: when the slave 088 // becomes a master, 089 // He should send a NULL message to all the consumers to 'wake them up' 090 // in case 091 // they were waiting for a message. 092 if (getPrefetchSize() == 0 && !isSlave()) { 093 final long dispatchCounterBeforePull; 094 synchronized(this) { 095 prefetchExtension++; 096 dispatchCounterBeforePull = dispatchCounter; 097 } 098 099 // Have the destination push us some messages. 100 for (Destination dest : destinations) { 101 dest.iterate(); 102 } 103 dispatchPending(); 104 105 synchronized(this) { 106 // If there was nothing dispatched.. we may need to setup a timeout. 107 if (dispatchCounterBeforePull == dispatchCounter) { 108 // immediate timeout used by receiveNoWait() 109 if (pull.getTimeout() == -1) { 110 // Send a NULL message. 111 add(QueueMessageReference.NULL_MESSAGE); 112 dispatchPending(); 113 } 114 if (pull.getTimeout() > 0) { 115 scheduler.executeAfterDelay(new Runnable() { 116 117 public void run() { 118 pullTimeout(dispatchCounterBeforePull); 119 } 120 }, pull.getTimeout()); 121 } 122 } 123 } 124 } 125 return null; 126 } 127 128 /** 129 * Occurs when a pull times out. If nothing has been dispatched since the 130 * timeout was setup, then send the NULL message. 131 */ 132 final void pullTimeout(long dispatchCounterBeforePull) { 133 synchronized (pendingLock) { 134 if (dispatchCounterBeforePull == dispatchCounter) { 135 try { 136 add(QueueMessageReference.NULL_MESSAGE); 137 dispatchPending(); 138 } catch (Exception e) { 139 context.getConnection().serviceException(e); 140 } 141 } 142 } 143 } 144 145 public void add(MessageReference node) throws Exception { 146 synchronized (pendingLock) { 147 // The destination may have just been removed... 148 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) { 149 // perhaps we should inform the caller that we are no longer valid to dispatch to? 150 return; 151 } 152 enqueueCounter++; 153 pending.addMessageLast(node); 154 } 155 dispatchPending(); 156 } 157 158 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 159 synchronized(pendingLock) { 160 try { 161 pending.reset(); 162 while (pending.hasNext()) { 163 MessageReference node = pending.next(); 164 node.decrementReferenceCount(); 165 if (node.getMessageId().equals(mdn.getMessageId())) { 166 // Synchronize between dispatched list and removal of messages from pending list 167 // related to remove subscription action 168 synchronized(dispatchLock) { 169 pending.remove(); 170 createMessageDispatch(node, node.getMessage()); 171 dispatched.add(node); 172 onDispatch(node, node.getMessage()); 173 } 174 return; 175 } 176 } 177 } finally { 178 pending.release(); 179 } 180 } 181 throw new JMSException( 182 "Slave broker out of sync with master: Dispatched message (" 183 + mdn.getMessageId() + ") was not in the pending list for " 184 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 185 } 186 187 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 188 // Handle the standard acknowledgment case. 189 boolean callDispatchMatched = false; 190 Destination destination = null; 191 192 if (!isSlave()) { 193 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 194 // suppress unexpected ack exception in this expected case 195 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack); 196 return; 197 } 198 } 199 if (LOG.isTraceEnabled()) { 200 LOG.trace("ack:" + ack); 201 } 202 synchronized(dispatchLock) { 203 if (ack.isStandardAck()) { 204 // First check if the ack matches the dispatched. When using failover this might 205 // not be the case. We don't ever want to ack the wrong messages. 206 assertAckMatchesDispatched(ack); 207 208 // Acknowledge all dispatched messages up till the message id of 209 // the acknowledgment. 210 int index = 0; 211 boolean inAckRange = false; 212 List<MessageReference> removeList = new ArrayList<MessageReference>(); 213 for (final MessageReference node : dispatched) { 214 MessageId messageId = node.getMessageId(); 215 if (ack.getFirstMessageId() == null 216 || ack.getFirstMessageId().equals(messageId)) { 217 inAckRange = true; 218 } 219 if (inAckRange) { 220 // Don't remove the nodes until we are committed. 221 if (!context.isInTransaction()) { 222 dequeueCounter++; 223 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 224 removeList.add(node); 225 } else { 226 // setup a Synchronization to remove nodes from the 227 // dispatched list. 228 context.getTransaction().addSynchronization( 229 new Synchronization() { 230 231 @Override 232 public void afterCommit() 233 throws Exception { 234 synchronized(dispatchLock) { 235 dequeueCounter++; 236 dispatched.remove(node); 237 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 238 } 239 } 240 241 @Override 242 public void afterRollback() throws Exception { 243 synchronized(dispatchLock) { 244 if (isSlave()) { 245 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 246 } else { 247 // poisionAck will decrement - otherwise still inflight on client 248 } 249 } 250 } 251 }); 252 } 253 index++; 254 acknowledge(context, ack, node); 255 if (ack.getLastMessageId().equals(messageId)) { 256 // contract prefetch if dispatch required a pull 257 if (getPrefetchSize() == 0) { 258 prefetchExtension = Math.max(0, prefetchExtension - index); 259 } else if (usePrefetchExtension && context.isInTransaction()) { 260 // extend prefetch window only if not a pulling consumer 261 prefetchExtension = Math.max(prefetchExtension, index); 262 } 263 destination = node.getRegionDestination(); 264 callDispatchMatched = true; 265 break; 266 } 267 } 268 } 269 for (final MessageReference node : removeList) { 270 dispatched.remove(node); 271 } 272 // this only happens after a reconnect - get an ack which is not 273 // valid 274 if (!callDispatchMatched) { 275 LOG.warn("Could not correlate acknowledgment with dispatched message: " 276 + ack); 277 } 278 } else if (ack.isIndividualAck()) { 279 // Message was delivered and acknowledge - but only delete the 280 // individual message 281 for (final MessageReference node : dispatched) { 282 MessageId messageId = node.getMessageId(); 283 if (ack.getLastMessageId().equals(messageId)) { 284 // this should never be within a transaction 285 dequeueCounter++; 286 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 287 destination = node.getRegionDestination(); 288 acknowledge(context, ack, node); 289 dispatched.remove(node); 290 prefetchExtension = Math.max(0, prefetchExtension - 1); 291 callDispatchMatched = true; 292 break; 293 } 294 } 295 }else if (ack.isDeliveredAck()) { 296 // Message was delivered but not acknowledged: update pre-fetch 297 // counters. 298 int index = 0; 299 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 300 final MessageReference node = iter.next(); 301 if (node.isExpired()) { 302 if (broker.isExpired(node)) { 303 node.getRegionDestination().messageExpired(context, this, node); 304 } 305 dispatched.remove(node); 306 node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); 307 } 308 if (ack.getLastMessageId().equals(node.getMessageId())) { 309 if (usePrefetchExtension) { 310 prefetchExtension = Math.max(prefetchExtension, index + 1); 311 } 312 destination = node.getRegionDestination(); 313 callDispatchMatched = true; 314 break; 315 } 316 } 317 if (!callDispatchMatched) { 318 throw new JMSException( 319 "Could not correlate acknowledgment with dispatched message: " 320 + ack); 321 } 322 } else if (ack.isRedeliveredAck()) { 323 // Message was re-delivered but it was not yet considered to be 324 // a DLQ message. 325 boolean inAckRange = false; 326 for (final MessageReference node : dispatched) { 327 MessageId messageId = node.getMessageId(); 328 if (ack.getFirstMessageId() == null 329 || ack.getFirstMessageId().equals(messageId)) { 330 inAckRange = true; 331 } 332 if (inAckRange) { 333 if (ack.getLastMessageId().equals(messageId)) { 334 destination = node.getRegionDestination(); 335 callDispatchMatched = true; 336 break; 337 } 338 } 339 } 340 if (!callDispatchMatched) { 341 throw new JMSException( 342 "Could not correlate acknowledgment with dispatched message: " 343 + ack); 344 } 345 } else if (ack.isPoisonAck()) { 346 // TODO: what if the message is already in a DLQ??? 347 // Handle the poison ACK case: we need to send the message to a 348 // DLQ 349 if (ack.isInTransaction()) { 350 throw new JMSException("Poison ack cannot be transacted: " 351 + ack); 352 } 353 int index = 0; 354 boolean inAckRange = false; 355 List<MessageReference> removeList = new ArrayList<MessageReference>(); 356 for (final MessageReference node : dispatched) { 357 MessageId messageId = node.getMessageId(); 358 if (ack.getFirstMessageId() == null 359 || ack.getFirstMessageId().equals(messageId)) { 360 inAckRange = true; 361 } 362 if (inAckRange) { 363 if (ack.getPoisonCause() != null) { 364 node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, 365 ack.getPoisonCause().toString()); 366 } 367 sendToDLQ(context, node); 368 node.getRegionDestination().getDestinationStatistics() 369 .getInflight().decrement(); 370 removeList.add(node); 371 dequeueCounter++; 372 index++; 373 acknowledge(context, ack, node); 374 if (ack.getLastMessageId().equals(messageId)) { 375 prefetchExtension = Math.max(0, prefetchExtension 376 - (index + 1)); 377 destination = node.getRegionDestination(); 378 callDispatchMatched = true; 379 break; 380 } 381 } 382 } 383 for (final MessageReference node : removeList) { 384 dispatched.remove(node); 385 } 386 if (!callDispatchMatched) { 387 throw new JMSException( 388 "Could not correlate acknowledgment with dispatched message: " 389 + ack); 390 } 391 } 392 } 393 if (callDispatchMatched && destination != null) { 394 destination.wakeup(); 395 dispatchPending(); 396 } else { 397 if (isSlave()) { 398 throw new JMSException( 399 "Slave broker out of sync with master: Acknowledgment (" 400 + ack + ") was not in the dispatch list: " 401 + dispatched); 402 } else { 403 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " 404 + ack); 405 } 406 } 407 } 408 409 /** 410 * Checks an ack versus the contents of the dispatched list. 411 * 412 * @param ack 413 * @throws JMSException if it does not match 414 */ 415 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 416 MessageId firstAckedMsg = ack.getFirstMessageId(); 417 MessageId lastAckedMsg = ack.getLastMessageId(); 418 int checkCount = 0; 419 boolean checkFoundStart = false; 420 boolean checkFoundEnd = false; 421 for (MessageReference node : dispatched) { 422 423 if (firstAckedMsg == null) { 424 checkFoundStart = true; 425 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 426 checkFoundStart = true; 427 } 428 429 if (checkFoundStart) { 430 checkCount++; 431 } 432 433 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 434 checkFoundEnd = true; 435 break; 436 } 437 } 438 if (!checkFoundStart && firstAckedMsg != null) 439 throw new JMSException("Unmatched acknowledge: " + ack 440 + "; Could not find Message-ID " + firstAckedMsg 441 + " in dispatched-list (start of ack)"); 442 if (!checkFoundEnd && lastAckedMsg != null) 443 throw new JMSException("Unmatched acknowledge: " + ack 444 + "; Could not find Message-ID " + lastAckedMsg 445 + " in dispatched-list (end of ack)"); 446 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 447 throw new JMSException("Unmatched acknowledge: " + ack 448 + "; Expected message count (" + ack.getMessageCount() 449 + ") differs from count in dispatched-list (" + checkCount 450 + ")"); 451 } 452 } 453 454 /** 455 * @param context 456 * @param node 457 * @throws IOException 458 * @throws Exception 459 */ 460 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { 461 broker.getRoot().sendToDeadLetterQueue(context, node, this); 462 } 463 464 public int getInFlightSize() { 465 return dispatched.size(); 466 } 467 468 /** 469 * Used to determine if the broker can dispatch to the consumer. 470 * 471 * @return 472 */ 473 public boolean isFull() { 474 return dispatched.size() - prefetchExtension >= info.getPrefetchSize(); 475 } 476 477 /** 478 * @return true when 60% or more room is left for dispatching messages 479 */ 480 public boolean isLowWaterMark() { 481 return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4); 482 } 483 484 /** 485 * @return true when 10% or less room is left for dispatching messages 486 */ 487 public boolean isHighWaterMark() { 488 return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); 489 } 490 491 @Override 492 public int countBeforeFull() { 493 return info.getPrefetchSize() + prefetchExtension - dispatched.size(); 494 } 495 496 public int getPendingQueueSize() { 497 return pending.size(); 498 } 499 500 public int getDispatchedQueueSize() { 501 return dispatched.size(); 502 } 503 504 public long getDequeueCounter() { 505 return dequeueCounter; 506 } 507 508 public long getDispatchedCounter() { 509 return dispatchCounter; 510 } 511 512 public long getEnqueueCounter() { 513 return enqueueCounter; 514 } 515 516 @Override 517 public boolean isRecoveryRequired() { 518 return pending.isRecoveryRequired(); 519 } 520 521 public PendingMessageCursor getPending() { 522 return this.pending; 523 } 524 525 public void setPending(PendingMessageCursor pending) { 526 this.pending = pending; 527 if (this.pending!=null) { 528 this.pending.setSystemUsage(usageManager); 529 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 530 } 531 } 532 533 @Override 534 public void add(ConnectionContext context, Destination destination) throws Exception { 535 synchronized(pendingLock) { 536 super.add(context, destination); 537 pending.add(context, destination); 538 } 539 } 540 541 @Override 542 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 543 List<MessageReference> rc = new ArrayList<MessageReference>(); 544 synchronized(pendingLock) { 545 super.remove(context, destination); 546 // Here is a potential problem concerning Inflight stat: 547 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 548 // Except if each commit or rollback callback action comes before remove of subscriber. 549 rc.addAll(pending.remove(context, destination)); 550 551 // Synchronized to DispatchLock 552 synchronized(dispatchLock) { 553 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 554 for (MessageReference r : dispatched) { 555 if( r.getRegionDestination() == destination) { 556 references.add(r); 557 } 558 } 559 rc.addAll(references); 560 destination.getDestinationStatistics().getDispatched().subtract(references.size()); 561 destination.getDestinationStatistics().getInflight().subtract(references.size()); 562 dispatched.removeAll(references); 563 } 564 } 565 return rc; 566 } 567 568 protected void dispatchPending() throws IOException { 569 if (!isSlave()) { 570 synchronized(pendingLock) { 571 try { 572 int numberToDispatch = countBeforeFull(); 573 if (numberToDispatch > 0) { 574 setSlowConsumer(false); 575 setPendingBatchSize(pending, numberToDispatch); 576 int count = 0; 577 pending.reset(); 578 while (pending.hasNext() && !isFull() 579 && count < numberToDispatch) { 580 MessageReference node = pending.next(); 581 if (node == null) { 582 break; 583 } 584 585 // Synchronize between dispatched list and remove of message from pending list 586 // related to remove subscription action 587 synchronized(dispatchLock) { 588 pending.remove(); 589 node.decrementReferenceCount(); 590 if( !isDropped(node) && canDispatch(node)) { 591 592 // Message may have been sitting in the pending 593 // list a while waiting for the consumer to ak the message. 594 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 595 //increment number to dispatch 596 numberToDispatch++; 597 if (broker.isExpired(node)) { 598 node.getRegionDestination().messageExpired(context, this, node); 599 } 600 continue; 601 } 602 dispatch(node); 603 count++; 604 } 605 } 606 } 607 } else if (!isSlowConsumer()) { 608 setSlowConsumer(true); 609 for (Destination dest :destinations) { 610 dest.slowConsumer(context, this); 611 } 612 } 613 } finally { 614 pending.release(); 615 } 616 } 617 } 618 } 619 620 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 621 pending.setMaxBatchSize(numberToDispatch); 622 } 623 624 protected boolean dispatch(final MessageReference node) throws IOException { 625 final Message message = node.getMessage(); 626 if (message == null) { 627 return false; 628 } 629 630 okForAckAsDispatchDone.countDown(); 631 632 // No reentrant lock - Patch needed to IndirectMessageReference on method lock 633 if (!isSlave()) { 634 635 MessageDispatch md = createMessageDispatch(node, message); 636 // NULL messages don't count... they don't get Acked. 637 if (node != QueueMessageReference.NULL_MESSAGE) { 638 dispatchCounter++; 639 dispatched.add(node); 640 } else { 641 prefetchExtension = Math.max(0, prefetchExtension - 1); 642 } 643 if (info.isDispatchAsync()) { 644 md.setTransmitCallback(new Runnable() { 645 646 public void run() { 647 // Since the message gets queued up in async dispatch, 648 // we don't want to 649 // decrease the reference count until it gets put on the 650 // wire. 651 onDispatch(node, message); 652 } 653 }); 654 context.getConnection().dispatchAsync(md); 655 } else { 656 context.getConnection().dispatchSync(md); 657 onDispatch(node, message); 658 } 659 return true; 660 } else { 661 return false; 662 } 663 } 664 665 protected void onDispatch(final MessageReference node, final Message message) { 666 if (node.getRegionDestination() != null) { 667 if (node != QueueMessageReference.NULL_MESSAGE) { 668 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 669 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 670 if (LOG.isTraceEnabled()) { 671 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - " 672 + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size()); 673 } 674 } 675 } 676 677 if (info.isDispatchAsync()) { 678 try { 679 dispatchPending(); 680 } catch (IOException e) { 681 context.getConnection().serviceExceptionAsync(e); 682 } 683 } 684 } 685 686 /** 687 * inform the MessageConsumer on the client to change it's prefetch 688 * 689 * @param newPrefetch 690 */ 691 public void updateConsumerPrefetch(int newPrefetch) { 692 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 693 ConsumerControl cc = new ConsumerControl(); 694 cc.setConsumerId(info.getConsumerId()); 695 cc.setPrefetch(newPrefetch); 696 context.getConnection().dispatchAsync(cc); 697 } 698 } 699 700 /** 701 * @param node 702 * @param message 703 * @return MessageDispatch 704 */ 705 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 706 if (node == QueueMessageReference.NULL_MESSAGE) { 707 MessageDispatch md = new MessageDispatch(); 708 md.setMessage(null); 709 md.setConsumerId(info.getConsumerId()); 710 md.setDestination(null); 711 return md; 712 } else { 713 MessageDispatch md = new MessageDispatch(); 714 md.setConsumerId(info.getConsumerId()); 715 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 716 md.setMessage(message); 717 md.setRedeliveryCounter(node.getRedeliveryCounter()); 718 return md; 719 } 720 } 721 722 /** 723 * Use when a matched message is about to be dispatched to the client. 724 * 725 * @param node 726 * @return false if the message should not be dispatched to the client 727 * (another sub may have already dispatched it for example). 728 * @throws IOException 729 */ 730 protected abstract boolean canDispatch(MessageReference node) throws IOException; 731 732 protected abstract boolean isDropped(MessageReference node); 733 734 /** 735 * Used during acknowledgment to remove the message. 736 * 737 * @throws IOException 738 */ 739 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 740 741 742 public int getMaxProducersToAudit() { 743 return maxProducersToAudit; 744 } 745 746 public void setMaxProducersToAudit(int maxProducersToAudit) { 747 this.maxProducersToAudit = maxProducersToAudit; 748 } 749 750 public int getMaxAuditDepth() { 751 return maxAuditDepth; 752 } 753 754 public void setMaxAuditDepth(int maxAuditDepth) { 755 this.maxAuditDepth = maxAuditDepth; 756 } 757 758 public boolean isUsePrefetchExtension() { 759 return usePrefetchExtension; 760 } 761 762 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 763 this.usePrefetchExtension = usePrefetchExtension; 764 } 765 }