001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    import javax.jms.InvalidSelectorException;
027    import javax.jms.JMSException;
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageDispatchNotification;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.thread.Scheduler;
043    import org.apache.activemq.transaction.Synchronization;
044    import org.apache.activemq.usage.SystemUsage;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A subscription that honors the pre-fetch option of the ConsumerInfo.
050     * 
051     * 
052     */
053    public abstract class PrefetchSubscription extends AbstractSubscription {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056        protected final Scheduler scheduler;
057        
058        protected PendingMessageCursor pending;
059        protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
060        protected int prefetchExtension;
061        protected boolean usePrefetchExtension = true;
062        protected long enqueueCounter;
063        protected long dispatchCounter;
064        protected long dequeueCounter;
065        private int maxProducersToAudit=32;
066        private int maxAuditDepth=2048;
067        protected final SystemUsage usageManager;
068        protected final Object pendingLock = new Object();
069        private final Object dispatchLock = new Object();
070        private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071        
072        public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073            super(broker,context, info);
074            this.usageManager=usageManager;
075            pending = cursor;
076            this.scheduler = broker.getScheduler();
077        }
078    
079        public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080            this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081        }
082    
083        /**
084         * Allows a message to be pulled on demand by a client
085         */
086        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087            // The slave should not deliver pull messages. TODO: when the slave
088            // becomes a master,
089            // He should send a NULL message to all the consumers to 'wake them up'
090            // in case
091            // they were waiting for a message.
092            if (getPrefetchSize() == 0 && !isSlave()) {
093                final long dispatchCounterBeforePull;
094                    synchronized(this) {
095                            prefetchExtension++;
096                            dispatchCounterBeforePull = dispatchCounter;
097                    }
098                
099                    // Have the destination push us some messages.
100                    for (Destination dest : destinations) {
101                                    dest.iterate();
102                            }
103                    dispatchPending();
104                
105                synchronized(this) {
106                        // If there was nothing dispatched.. we may need to setup a timeout.
107                        if (dispatchCounterBeforePull == dispatchCounter) {
108                            // immediate timeout used by receiveNoWait()
109                            if (pull.getTimeout() == -1) {
110                                // Send a NULL message.
111                                add(QueueMessageReference.NULL_MESSAGE);
112                                dispatchPending();
113                            }
114                            if (pull.getTimeout() > 0) {
115                                scheduler.executeAfterDelay(new Runnable() {
116            
117                                    public void run() {
118                                        pullTimeout(dispatchCounterBeforePull);
119                                    }
120                                }, pull.getTimeout());
121                            }
122                        }
123                }
124            }
125            return null;
126        }
127    
128        /**
129         * Occurs when a pull times out. If nothing has been dispatched since the
130         * timeout was setup, then send the NULL message.
131         */
132        final void pullTimeout(long dispatchCounterBeforePull) {
133            synchronized (pendingLock) {
134                    if (dispatchCounterBeforePull == dispatchCounter) {
135                    try {
136                        add(QueueMessageReference.NULL_MESSAGE);
137                        dispatchPending();
138                    } catch (Exception e) {
139                        context.getConnection().serviceException(e);
140                    }
141                }
142            }
143        }
144    
145        public void add(MessageReference node) throws Exception {
146            synchronized (pendingLock) {
147                // The destination may have just been removed...  
148                if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
149                    // perhaps we should inform the caller that we are no longer valid to dispatch to?
150                    return;
151                }
152                enqueueCounter++;
153                pending.addMessageLast(node);    
154            }
155            dispatchPending();
156        }
157    
158        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
159            synchronized(pendingLock) {
160                try {
161                    pending.reset();
162                    while (pending.hasNext()) {
163                        MessageReference node = pending.next();
164                        node.decrementReferenceCount();
165                        if (node.getMessageId().equals(mdn.getMessageId())) {
166                            // Synchronize between dispatched list and removal of messages from pending list
167                            // related to remove subscription action
168                            synchronized(dispatchLock) {
169                                pending.remove();
170                                createMessageDispatch(node, node.getMessage());
171                                dispatched.add(node);
172                                onDispatch(node, node.getMessage());
173                            }
174                            return;
175                        }
176                    }
177                } finally {
178                    pending.release();
179                }
180            }
181            throw new JMSException(
182                    "Slave broker out of sync with master: Dispatched message ("
183                            + mdn.getMessageId() + ") was not in the pending list for "
184                            + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
185        }
186    
187        public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
188            // Handle the standard acknowledgment case.
189            boolean callDispatchMatched = false;
190            Destination destination = null;
191            
192            if (!isSlave()) {
193                if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
194                    // suppress unexpected ack exception in this expected case
195                    LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
196                    return;
197                }
198            }
199            if (LOG.isTraceEnabled()) {
200                LOG.trace("ack:" + ack);
201            }
202            synchronized(dispatchLock) {
203                if (ack.isStandardAck()) {
204                    // First check if the ack matches the dispatched. When using failover this might
205                    // not be the case. We don't ever want to ack the wrong messages.
206                    assertAckMatchesDispatched(ack);
207                    
208                    // Acknowledge all dispatched messages up till the message id of
209                    // the acknowledgment.
210                    int index = 0;
211                    boolean inAckRange = false;
212                    List<MessageReference> removeList = new ArrayList<MessageReference>();
213                    for (final MessageReference node : dispatched) {
214                        MessageId messageId = node.getMessageId();
215                        if (ack.getFirstMessageId() == null
216                                || ack.getFirstMessageId().equals(messageId)) {
217                            inAckRange = true;
218                        }
219                        if (inAckRange) {
220                            // Don't remove the nodes until we are committed.  
221                            if (!context.isInTransaction()) {
222                                dequeueCounter++;
223                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
224                                removeList.add(node);
225                            } else {
226                                // setup a Synchronization to remove nodes from the
227                                // dispatched list.
228                                context.getTransaction().addSynchronization(
229                                        new Synchronization() {
230    
231                                            @Override
232                                            public void afterCommit()
233                                                    throws Exception {
234                                                synchronized(dispatchLock) {
235                                                    dequeueCounter++;
236                                                    dispatched.remove(node);
237                                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
238                                                }
239                                            }
240    
241                                            @Override
242                                            public void afterRollback() throws Exception {
243                                                synchronized(dispatchLock) {
244                                                    if (isSlave()) {
245                                                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
246                                                    } else {
247                                                        // poisionAck will decrement - otherwise still inflight on client
248                                                    }
249                                                }
250                                            }
251                                        });
252                            }
253                            index++;
254                            acknowledge(context, ack, node);
255                            if (ack.getLastMessageId().equals(messageId)) {                  
256                                // contract prefetch if dispatch required a pull
257                                if (getPrefetchSize() == 0) {
258                                    prefetchExtension = Math.max(0, prefetchExtension - index);
259                                } else if (usePrefetchExtension && context.isInTransaction()) {
260                                    // extend prefetch window only if not a pulling consumer
261                                    prefetchExtension = Math.max(prefetchExtension, index);
262                                }
263                                destination = node.getRegionDestination();
264                                callDispatchMatched = true;
265                                break;
266                            }
267                        }
268                    }
269                    for (final MessageReference node : removeList) {
270                        dispatched.remove(node);
271                    }
272                    // this only happens after a reconnect - get an ack which is not
273                    // valid
274                    if (!callDispatchMatched) {
275                        LOG.warn("Could not correlate acknowledgment with dispatched message: "
276                                      + ack);
277                    }
278                } else if (ack.isIndividualAck()) {
279                    // Message was delivered and acknowledge - but only delete the
280                    // individual message
281                    for (final MessageReference node : dispatched) {
282                        MessageId messageId = node.getMessageId();
283                        if (ack.getLastMessageId().equals(messageId)) {
284                            // this should never be within a transaction
285                            dequeueCounter++;
286                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
287                            destination = node.getRegionDestination();
288                            acknowledge(context, ack, node);
289                            dispatched.remove(node);
290                            prefetchExtension = Math.max(0, prefetchExtension - 1);
291                            callDispatchMatched = true;
292                            break;
293                        }
294                    }
295                }else if (ack.isDeliveredAck()) {
296                    // Message was delivered but not acknowledged: update pre-fetch
297                    // counters.
298                    int index = 0;
299                    for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300                        final MessageReference node = iter.next();
301                        if (node.isExpired()) {
302                            if (broker.isExpired(node)) {
303                                node.getRegionDestination().messageExpired(context, this, node);
304                            }
305                            dispatched.remove(node);
306                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307                        }
308                        if (ack.getLastMessageId().equals(node.getMessageId())) {
309                            if (usePrefetchExtension) {
310                                prefetchExtension = Math.max(prefetchExtension, index + 1);
311                            }
312                            destination = node.getRegionDestination();
313                            callDispatchMatched = true;
314                            break;
315                        }
316                    }
317                    if (!callDispatchMatched) {
318                        throw new JMSException(
319                                "Could not correlate acknowledgment with dispatched message: "
320                                        + ack);
321                    }
322                } else if (ack.isRedeliveredAck()) {
323                    // Message was re-delivered but it was not yet considered to be
324                    // a DLQ message.
325                    boolean inAckRange = false;
326                    for (final MessageReference node : dispatched) {
327                        MessageId messageId = node.getMessageId();
328                        if (ack.getFirstMessageId() == null
329                                || ack.getFirstMessageId().equals(messageId)) {
330                            inAckRange = true;
331                        }
332                        if (inAckRange) {
333                            if (ack.getLastMessageId().equals(messageId)) {
334                                destination = node.getRegionDestination();
335                                callDispatchMatched = true;
336                                break;
337                            }
338                        }
339                    }
340                    if (!callDispatchMatched) {
341                        throw new JMSException(
342                                "Could not correlate acknowledgment with dispatched message: "
343                                        + ack);
344                    }
345                } else if (ack.isPoisonAck()) {
346                    // TODO: what if the message is already in a DLQ???
347                    // Handle the poison ACK case: we need to send the message to a
348                    // DLQ
349                    if (ack.isInTransaction()) {
350                        throw new JMSException("Poison ack cannot be transacted: "
351                                + ack);
352                    }
353                    int index = 0;
354                    boolean inAckRange = false;
355                    List<MessageReference> removeList = new ArrayList<MessageReference>();
356                    for (final MessageReference node : dispatched) {
357                        MessageId messageId = node.getMessageId();
358                        if (ack.getFirstMessageId() == null
359                                || ack.getFirstMessageId().equals(messageId)) {
360                            inAckRange = true;
361                        }
362                        if (inAckRange) {
363                            if (ack.getPoisonCause() != null) {
364                                node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
365                                        ack.getPoisonCause().toString());
366                            }
367                            sendToDLQ(context, node);
368                            node.getRegionDestination().getDestinationStatistics()
369                                    .getInflight().decrement();
370                            removeList.add(node);
371                            dequeueCounter++;
372                            index++;
373                            acknowledge(context, ack, node);
374                            if (ack.getLastMessageId().equals(messageId)) {
375                                prefetchExtension = Math.max(0, prefetchExtension
376                                        - (index + 1));
377                                destination = node.getRegionDestination();
378                                callDispatchMatched = true;
379                                break;
380                            }
381                        }
382                    }
383                    for (final MessageReference node : removeList) {
384                        dispatched.remove(node);
385                    }
386                    if (!callDispatchMatched) {
387                        throw new JMSException(
388                                "Could not correlate acknowledgment with dispatched message: "
389                                        + ack);
390                    }
391                }
392            }
393            if (callDispatchMatched && destination != null) {    
394                destination.wakeup();
395                dispatchPending();
396            } else {
397                if (isSlave()) {
398                    throw new JMSException(
399                            "Slave broker out of sync with master: Acknowledgment ("
400                                    + ack + ") was not in the dispatch list: "
401                                    + dispatched);
402                } else {
403                    LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
404                            + ack);
405                }
406            }
407        }
408    
409        /**
410         * Checks an ack versus the contents of the dispatched list.
411         * 
412         * @param ack
413         * @throws JMSException if it does not match
414         */
415            protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
416            MessageId firstAckedMsg = ack.getFirstMessageId();
417            MessageId lastAckedMsg = ack.getLastMessageId();
418            int checkCount = 0;
419            boolean checkFoundStart = false;
420            boolean checkFoundEnd = false;
421            for (MessageReference node : dispatched) {
422    
423                if (firstAckedMsg == null) {
424                    checkFoundStart = true;
425                } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
426                    checkFoundStart = true;
427                }
428    
429                if (checkFoundStart) {
430                    checkCount++;
431                }
432    
433                if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
434                    checkFoundEnd = true;
435                    break;
436                }
437            }
438            if (!checkFoundStart && firstAckedMsg != null)
439                throw new JMSException("Unmatched acknowledge: " + ack
440                        + "; Could not find Message-ID " + firstAckedMsg
441                        + " in dispatched-list (start of ack)");
442            if (!checkFoundEnd && lastAckedMsg != null)
443                throw new JMSException("Unmatched acknowledge: " + ack
444                        + "; Could not find Message-ID " + lastAckedMsg
445                        + " in dispatched-list (end of ack)");
446            if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
447                throw new JMSException("Unmatched acknowledge: " + ack
448                        + "; Expected message count (" + ack.getMessageCount()
449                        + ") differs from count in dispatched-list (" + checkCount
450                        + ")");
451            }
452        }
453    
454        /**
455         * @param context
456         * @param node
457         * @throws IOException
458         * @throws Exception
459         */
460        protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
461            broker.getRoot().sendToDeadLetterQueue(context, node, this);
462        }
463        
464        public int getInFlightSize() {
465            return dispatched.size();
466        }
467        
468        /**
469         * Used to determine if the broker can dispatch to the consumer.
470         * 
471         * @return
472         */
473        public boolean isFull() {
474            return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
475        }
476    
477        /**
478         * @return true when 60% or more room is left for dispatching messages
479         */
480        public boolean isLowWaterMark() {
481            return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
482        }
483    
484        /**
485         * @return true when 10% or less room is left for dispatching messages
486         */
487        public boolean isHighWaterMark() {
488            return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
489        }
490    
491        @Override
492        public int countBeforeFull() {
493            return info.getPrefetchSize() + prefetchExtension - dispatched.size();
494        }
495    
496        public int getPendingQueueSize() {
497            return pending.size();
498        }
499    
500        public int getDispatchedQueueSize() {
501            return dispatched.size();
502        }
503    
504        public long getDequeueCounter() {
505            return dequeueCounter;
506        }
507    
508        public long getDispatchedCounter() {
509            return dispatchCounter;
510        }
511    
512        public long getEnqueueCounter() {
513            return enqueueCounter;
514        }
515    
516        @Override
517        public boolean isRecoveryRequired() {
518            return pending.isRecoveryRequired();
519        }
520    
521        public PendingMessageCursor getPending() {
522            return this.pending;
523        }
524    
525        public void setPending(PendingMessageCursor pending) {
526            this.pending = pending;
527            if (this.pending!=null) {
528                this.pending.setSystemUsage(usageManager);
529                this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
530            }
531        }
532    
533       @Override
534        public void add(ConnectionContext context, Destination destination) throws Exception {
535            synchronized(pendingLock) {
536                super.add(context, destination);
537                pending.add(context, destination);
538            }
539        }
540    
541        @Override
542        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
543            List<MessageReference> rc = new ArrayList<MessageReference>();
544            synchronized(pendingLock) {
545                super.remove(context, destination);
546                // Here is a potential problem concerning Inflight stat:
547                // Messages not already committed or rolled back may not be removed from dispatched list at the moment
548                // Except if each commit or rollback callback action comes before remove of subscriber.
549                rc.addAll(pending.remove(context, destination));
550    
551                // Synchronized to DispatchLock
552                synchronized(dispatchLock) {
553                    ArrayList<MessageReference> references = new ArrayList<MessageReference>();
554                        for (MessageReference r : dispatched) {
555                            if( r.getRegionDestination() == destination) {
556                            references.add(r);
557                            }
558                        }
559                    rc.addAll(references);
560                    destination.getDestinationStatistics().getDispatched().subtract(references.size());
561                    destination.getDestinationStatistics().getInflight().subtract(references.size());
562                    dispatched.removeAll(references);
563                }            
564            }
565            return rc;
566        }
567    
568        protected void dispatchPending() throws IOException {
569            if (!isSlave()) {
570               synchronized(pendingLock) {
571                    try {
572                        int numberToDispatch = countBeforeFull();
573                        if (numberToDispatch > 0) {
574                            setSlowConsumer(false);
575                            setPendingBatchSize(pending, numberToDispatch);
576                            int count = 0;
577                            pending.reset();
578                            while (pending.hasNext() && !isFull()
579                                    && count < numberToDispatch) {
580                                MessageReference node = pending.next();
581                                if (node == null) {
582                                    break;
583                                }
584                                
585                                // Synchronize between dispatched list and remove of message from pending list
586                                // related to remove subscription action
587                                synchronized(dispatchLock) {
588                                    pending.remove();
589                                    node.decrementReferenceCount();
590                                    if( !isDropped(node) && canDispatch(node)) {
591    
592                                        // Message may have been sitting in the pending
593                                        // list a while waiting for the consumer to ak the message.
594                                        if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
595                                            //increment number to dispatch
596                                            numberToDispatch++;
597                                            if (broker.isExpired(node)) {
598                                                node.getRegionDestination().messageExpired(context, this, node);
599                                            }
600                                            continue;
601                                        }
602                                        dispatch(node);
603                                        count++;
604                                    }
605                                }
606                            }
607                        } else if (!isSlowConsumer()) {
608                            setSlowConsumer(true);
609                            for (Destination dest :destinations) {
610                                dest.slowConsumer(context, this);
611                            }
612                        }
613                    } finally {
614                        pending.release();
615                    }
616                }
617            }
618        }
619    
620        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
621            pending.setMaxBatchSize(numberToDispatch);
622        }
623    
624        protected boolean dispatch(final MessageReference node) throws IOException {
625            final Message message = node.getMessage();
626            if (message == null) {
627                return false;
628            }
629            
630            okForAckAsDispatchDone.countDown();
631            
632            // No reentrant lock - Patch needed to IndirectMessageReference on method lock
633            if (!isSlave()) {
634    
635                MessageDispatch md = createMessageDispatch(node, message);
636                // NULL messages don't count... they don't get Acked.
637                if (node != QueueMessageReference.NULL_MESSAGE) {
638                    dispatchCounter++;
639                    dispatched.add(node);
640                } else {
641                    prefetchExtension = Math.max(0, prefetchExtension - 1);
642                }
643                if (info.isDispatchAsync()) {
644                    md.setTransmitCallback(new Runnable() {
645    
646                        public void run() {
647                            // Since the message gets queued up in async dispatch,
648                            // we don't want to
649                            // decrease the reference count until it gets put on the
650                            // wire.
651                            onDispatch(node, message);
652                        }
653                    });
654                    context.getConnection().dispatchAsync(md);
655                } else {
656                    context.getConnection().dispatchSync(md);
657                    onDispatch(node, message);
658                }
659                return true;
660            } else {
661                return false;
662            }
663        }
664    
665        protected void onDispatch(final MessageReference node, final Message message) {
666            if (node.getRegionDestination() != null) {
667                if (node != QueueMessageReference.NULL_MESSAGE) {
668                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
669                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();   
670                    if (LOG.isTraceEnabled()) {
671                        LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
672                                + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
673                    }
674                }
675            }
676            
677            if (info.isDispatchAsync()) {
678                try {
679                    dispatchPending();
680                } catch (IOException e) {
681                    context.getConnection().serviceExceptionAsync(e);
682                }
683            }
684        }
685    
686        /**
687         * inform the MessageConsumer on the client to change it's prefetch
688         * 
689         * @param newPrefetch
690         */
691        public void updateConsumerPrefetch(int newPrefetch) {
692            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
693                ConsumerControl cc = new ConsumerControl();
694                cc.setConsumerId(info.getConsumerId());
695                cc.setPrefetch(newPrefetch);
696                context.getConnection().dispatchAsync(cc);
697            }
698        }
699    
700        /**
701         * @param node
702         * @param message
703         * @return MessageDispatch
704         */
705        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
706            if (node == QueueMessageReference.NULL_MESSAGE) {
707                MessageDispatch md = new MessageDispatch();
708                md.setMessage(null);
709                md.setConsumerId(info.getConsumerId());
710                md.setDestination(null);
711                return md;
712            } else {
713                MessageDispatch md = new MessageDispatch();
714                md.setConsumerId(info.getConsumerId());
715                md.setDestination(node.getRegionDestination().getActiveMQDestination());
716                md.setMessage(message);
717                md.setRedeliveryCounter(node.getRedeliveryCounter());
718                return md;
719            }
720        }
721    
722        /**
723         * Use when a matched message is about to be dispatched to the client.
724         * 
725         * @param node
726         * @return false if the message should not be dispatched to the client
727         *         (another sub may have already dispatched it for example).
728         * @throws IOException
729         */
730        protected abstract boolean canDispatch(MessageReference node) throws IOException;
731        
732        protected abstract boolean isDropped(MessageReference node);
733    
734        /**
735         * Used during acknowledgment to remove the message.
736         * 
737         * @throws IOException
738         */
739        protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
740    
741        
742        public int getMaxProducersToAudit() {
743            return maxProducersToAudit;
744        }
745    
746        public void setMaxProducersToAudit(int maxProducersToAudit) {
747            this.maxProducersToAudit = maxProducersToAudit;
748        }
749    
750        public int getMaxAuditDepth() {
751            return maxAuditDepth;
752        }
753    
754        public void setMaxAuditDepth(int maxAuditDepth) {
755            this.maxAuditDepth = maxAuditDepth;
756        }
757        
758        public boolean isUsePrefetchExtension() {
759            return usePrefetchExtension;
760        }
761    
762        public void setUsePrefetchExtension(boolean usePrefetchExtension) {
763            this.usePrefetchExtension = usePrefetchExtension;
764        }
765    }