001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import javax.jms.ResourceAllocationException;
021    import org.apache.activemq.advisory.AdvisorySupport;
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.BrokerService;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
027    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQTopic;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageDispatchNotification;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.state.ProducerState;
034    import org.apache.activemq.store.MessageStore;
035    import org.apache.activemq.usage.MemoryUsage;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.slf4j.Logger;
039    
040    /**
041     * 
042     */
043    public abstract class BaseDestination implements Destination {
044        /**
045         * The maximum number of messages to page in to the destination from
046         * persistent storage
047         */
048        public static final int MAX_PAGE_SIZE = 200;
049        public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
050        public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
051        public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
052        public static final int MAX_PRODUCERS_TO_AUDIT = 64;
053        public static final int MAX_AUDIT_DEPTH = 2048;
054    
055        protected final ActiveMQDestination destination;
056        protected final Broker broker;
057        protected final MessageStore store;
058        protected SystemUsage systemUsage;
059        protected MemoryUsage memoryUsage;
060        private boolean producerFlowControl = true;
061        protected boolean warnOnProducerFlowControl = true;
062        protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
063    
064        private int maxProducersToAudit = 1024;
065        private int maxAuditDepth = 2048;
066        private boolean enableAudit = true;
067        private int maxPageSize = MAX_PAGE_SIZE;
068        private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
069        private boolean useCache = true;
070        private int minimumMessageSize = 1024;
071        private boolean lazyDispatch = false;
072        private boolean advisoryForSlowConsumers;
073        private boolean advisdoryForFastProducers;
074        private boolean advisoryForDiscardingMessages;
075        private boolean advisoryWhenFull;
076        private boolean advisoryForDelivery;
077        private boolean advisoryForConsumed;
078        private boolean sendAdvisoryIfNoConsumers;
079        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
080        protected final BrokerService brokerService;
081        protected final Broker regionBroker;
082        protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
083        protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
084        private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
085        protected int cursorMemoryHighWaterMark = 70;
086        protected int storeUsageHighWaterMark = 100;
087        private SlowConsumerStrategy slowConsumerStrategy;
088        private boolean prioritizedMessages;
089        private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
090        private boolean gcIfInactive;
091        private long lastActiveTime=0l;
092        private boolean reduceMemoryFootprint = false;
093    
094        /**
095         * @param broker
096         * @param store
097         * @param destination
098         * @param parentStats
099         * @throws Exception
100         */
101        public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
102            this.brokerService = brokerService;
103            this.broker = brokerService.getBroker();
104            this.store = store;
105            this.destination = destination;
106            // let's copy the enabled property from the parent DestinationStatistics
107            this.destinationStatistics.setEnabled(parentStats.isEnabled());
108            this.destinationStatistics.setParent(parentStats);
109            this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
110            this.memoryUsage = this.systemUsage.getMemoryUsage();
111            this.memoryUsage.setUsagePortion(1.0f);
112            this.regionBroker = brokerService.getRegionBroker();
113        }
114    
115        /**
116         * initialize the destination
117         * 
118         * @throws Exception
119         */
120        public void initialize() throws Exception {
121            // Let the store know what usage manager we are using so that he can
122            // flush messages to disk when usage gets high.
123            if (store != null) {
124                store.setMemoryUsage(this.memoryUsage);
125            }
126        }
127    
128        /**
129         * @return the producerFlowControl
130         */
131        public boolean isProducerFlowControl() {
132            return producerFlowControl;
133        }
134    
135        /**
136         * @param producerFlowControl the producerFlowControl to set
137         */
138        public void setProducerFlowControl(boolean producerFlowControl) {
139            this.producerFlowControl = producerFlowControl;
140        }
141    
142        /**
143         * Set's the interval at which warnings about producers being blocked by
144         * resource usage will be triggered. Values of 0 or less will disable
145         * warnings
146         * 
147         * @param blockedProducerWarningInterval the interval at which warning about
148         *            blocked producers will be triggered.
149         */
150        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
151            this.blockedProducerWarningInterval = blockedProducerWarningInterval;
152        }
153    
154        /**
155         * 
156         * @return the interval at which warning about blocked producers will be
157         *         triggered.
158         */
159        public long getBlockedProducerWarningInterval() {
160            return blockedProducerWarningInterval;
161        }
162    
163        /**
164         * @return the maxProducersToAudit
165         */
166        public int getMaxProducersToAudit() {
167            return maxProducersToAudit;
168        }
169    
170        /**
171         * @param maxProducersToAudit the maxProducersToAudit to set
172         */
173        public void setMaxProducersToAudit(int maxProducersToAudit) {
174            this.maxProducersToAudit = maxProducersToAudit;
175        }
176    
177        /**
178         * @return the maxAuditDepth
179         */
180        public int getMaxAuditDepth() {
181            return maxAuditDepth;
182        }
183    
184        /**
185         * @param maxAuditDepth the maxAuditDepth to set
186         */
187        public void setMaxAuditDepth(int maxAuditDepth) {
188            this.maxAuditDepth = maxAuditDepth;
189        }
190    
191        /**
192         * @return the enableAudit
193         */
194        public boolean isEnableAudit() {
195            return enableAudit;
196        }
197    
198        /**
199         * @param enableAudit the enableAudit to set
200         */
201        public void setEnableAudit(boolean enableAudit) {
202            this.enableAudit = enableAudit;
203        }
204    
205        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
206            destinationStatistics.getProducers().increment();
207            this.lastActiveTime=0l;
208        }
209    
210        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
211            destinationStatistics.getProducers().decrement();
212        }
213        
214        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
215            destinationStatistics.getConsumers().increment();
216            this.lastActiveTime=0l;
217        }
218    
219        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
220            destinationStatistics.getConsumers().decrement();
221        }
222    
223    
224        public final MemoryUsage getMemoryUsage() {
225            return memoryUsage;
226        }
227    
228        public DestinationStatistics getDestinationStatistics() {
229            return destinationStatistics;
230        }
231    
232        public ActiveMQDestination getActiveMQDestination() {
233            return destination;
234        }
235    
236        public final String getName() {
237            return getActiveMQDestination().getPhysicalName();
238        }
239    
240        public final MessageStore getMessageStore() {
241            return store;
242        }
243    
244        public final boolean isActive() {
245            return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
246        }
247    
248        public int getMaxPageSize() {
249            return maxPageSize;
250        }
251    
252        public void setMaxPageSize(int maxPageSize) {
253            this.maxPageSize = maxPageSize;
254        }
255    
256        public int getMaxBrowsePageSize() {
257            return this.maxBrowsePageSize;
258        }
259    
260        public void setMaxBrowsePageSize(int maxPageSize) {
261            this.maxBrowsePageSize = maxPageSize;
262        }
263    
264        public int getMaxExpirePageSize() {
265            return this.maxExpirePageSize;
266        }
267    
268        public void setMaxExpirePageSize(int maxPageSize) {
269            this.maxExpirePageSize = maxPageSize;
270        }
271    
272        public void setExpireMessagesPeriod(long expireMessagesPeriod) {
273            this.expireMessagesPeriod = expireMessagesPeriod;
274        }
275    
276        public long getExpireMessagesPeriod() {
277            return expireMessagesPeriod;
278        }
279    
280        public boolean isUseCache() {
281            return useCache;
282        }
283    
284        public void setUseCache(boolean useCache) {
285            this.useCache = useCache;
286        }
287    
288        public int getMinimumMessageSize() {
289            return minimumMessageSize;
290        }
291    
292        public void setMinimumMessageSize(int minimumMessageSize) {
293            this.minimumMessageSize = minimumMessageSize;
294        }
295    
296        public boolean isLazyDispatch() {
297            return lazyDispatch;
298        }
299    
300        public void setLazyDispatch(boolean lazyDispatch) {
301            this.lazyDispatch = lazyDispatch;
302        }
303    
304        protected long getDestinationSequenceId() {
305            return regionBroker.getBrokerSequenceId();
306        }
307    
308        /**
309         * @return the advisoryForSlowConsumers
310         */
311        public boolean isAdvisoryForSlowConsumers() {
312            return advisoryForSlowConsumers;
313        }
314    
315        /**
316         * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
317         */
318        public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
319            this.advisoryForSlowConsumers = advisoryForSlowConsumers;
320        }
321    
322        /**
323         * @return the advisoryForDiscardingMessages
324         */
325        public boolean isAdvisoryForDiscardingMessages() {
326            return advisoryForDiscardingMessages;
327        }
328    
329        /**
330         * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
331         *            set
332         */
333        public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
334            this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
335        }
336    
337        /**
338         * @return the advisoryWhenFull
339         */
340        public boolean isAdvisoryWhenFull() {
341            return advisoryWhenFull;
342        }
343    
344        /**
345         * @param advisoryWhenFull the advisoryWhenFull to set
346         */
347        public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
348            this.advisoryWhenFull = advisoryWhenFull;
349        }
350    
351        /**
352         * @return the advisoryForDelivery
353         */
354        public boolean isAdvisoryForDelivery() {
355            return advisoryForDelivery;
356        }
357    
358        /**
359         * @param advisoryForDelivery the advisoryForDelivery to set
360         */
361        public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
362            this.advisoryForDelivery = advisoryForDelivery;
363        }
364    
365        /**
366         * @return the advisoryForConsumed
367         */
368        public boolean isAdvisoryForConsumed() {
369            return advisoryForConsumed;
370        }
371    
372        /**
373         * @param advisoryForConsumed the advisoryForConsumed to set
374         */
375        public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
376            this.advisoryForConsumed = advisoryForConsumed;
377        }
378    
379        /**
380         * @return the advisdoryForFastProducers
381         */
382        public boolean isAdvisdoryForFastProducers() {
383            return advisdoryForFastProducers;
384        }
385    
386        /**
387         * @param advisdoryForFastProducers the advisdoryForFastProducers to set
388         */
389        public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
390            this.advisdoryForFastProducers = advisdoryForFastProducers;
391        }
392    
393        public boolean isSendAdvisoryIfNoConsumers() {
394            return sendAdvisoryIfNoConsumers;
395        }
396    
397        public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
398            this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
399        }
400    
401        /**
402         * @return the dead letter strategy
403         */
404        public DeadLetterStrategy getDeadLetterStrategy() {
405            return deadLetterStrategy;
406        }
407    
408        /**
409         * set the dead letter strategy
410         * 
411         * @param deadLetterStrategy
412         */
413        public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
414            this.deadLetterStrategy = deadLetterStrategy;
415        }
416    
417        public int getCursorMemoryHighWaterMark() {
418            return this.cursorMemoryHighWaterMark;
419        }
420    
421        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
422            this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
423        }
424    
425        /**
426         * called when message is consumed
427         * 
428         * @param context
429         * @param messageReference
430         */
431        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
432            if (advisoryForConsumed) {
433                broker.messageConsumed(context, messageReference);
434            }
435        }
436    
437        /**
438         * Called when message is delivered to the broker
439         * 
440         * @param context
441         * @param messageReference
442         */
443        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
444            if (advisoryForDelivery) {
445                broker.messageDelivered(context, messageReference);
446            }
447        }
448    
449        /**
450         * Called when a message is discarded - e.g. running low on memory This will
451         * happen only if the policy is enabled - e.g. non durable topics
452         * 
453         * @param context
454         * @param messageReference
455         */
456        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
457            if (advisoryForDiscardingMessages) {
458                broker.messageDiscarded(context, sub, messageReference);
459            }
460        }
461    
462        /**
463         * Called when there is a slow consumer
464         * 
465         * @param context
466         * @param subs
467         */
468        public void slowConsumer(ConnectionContext context, Subscription subs) {
469            if (advisoryForSlowConsumers) {
470                broker.slowConsumer(context, this, subs);
471            }
472            if (slowConsumerStrategy != null) {
473                slowConsumerStrategy.slowConsumer(context, subs);
474            }
475        }
476    
477        /**
478         * Called to notify a producer is too fast
479         * 
480         * @param context
481         * @param producerInfo
482         */
483        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
484            if (advisdoryForFastProducers) {
485                broker.fastProducer(context, producerInfo);
486            }
487        }
488    
489        /**
490         * Called when a Usage reaches a limit
491         * 
492         * @param context
493         * @param usage
494         */
495        public void isFull(ConnectionContext context, Usage usage) {
496            if (advisoryWhenFull) {
497                broker.isFull(context, this, usage);
498            }
499        }
500    
501        public void dispose(ConnectionContext context) throws IOException {
502            if (this.store != null) {
503                this.store.removeAllMessages(context);
504                this.store.dispose(context);
505            }
506            this.destinationStatistics.setParent(null);
507            this.memoryUsage.stop();
508        }
509    
510        /**
511         * Provides a hook to allow messages with no consumer to be processed in
512         * some way - such as to send to a dead letter queue or something..
513         */
514        protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
515            if (!msg.isPersistent()) {
516                if (isSendAdvisoryIfNoConsumers()) {
517                    // allow messages with no consumers to be dispatched to a dead
518                    // letter queue
519                    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
520    
521                        Message message = msg.copy();
522                        // The original destination and transaction id do not get
523                        // filled when the message is first sent,
524                        // it is only populated if the message is routed to another
525                        // destination like the DLQ
526                        if (message.getOriginalDestination() != null) {
527                            message.setOriginalDestination(message.getDestination());
528                        }
529                        if (message.getOriginalTransactionId() != null) {
530                            message.setOriginalTransactionId(message.getTransactionId());
531                        }
532    
533                        ActiveMQTopic advisoryTopic;
534                        if (destination.isQueue()) {
535                            advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
536                        } else {
537                            advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
538                        }
539                        message.setDestination(advisoryTopic);
540                        message.setTransactionId(null);
541    
542                        // Disable flow control for this since since we don't want
543                        // to block.
544                        boolean originalFlowControl = context.isProducerFlowControl();
545                        try {
546                            context.setProducerFlowControl(false);
547                            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
548                            producerExchange.setMutable(false);
549                            producerExchange.setConnectionContext(context);
550                            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
551                            context.getBroker().send(producerExchange, message);
552                        } finally {
553                            context.setProducerFlowControl(originalFlowControl);
554                        }
555    
556                    }
557                }
558            }
559        }
560    
561        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
562        }
563    
564        public final int getStoreUsageHighWaterMark() {
565            return this.storeUsageHighWaterMark;
566        }
567    
568        public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
569            this.storeUsageHighWaterMark = storeUsageHighWaterMark;
570        }
571    
572        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
573            waitForSpace(context, usage, 100, warning);
574        }
575        
576        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
577            if (systemUsage.isSendFailIfNoSpace()) {
578                getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
579                throw new ResourceAllocationException(warning);
580            }
581            if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
582                if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
583                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
584                    throw new ResourceAllocationException(warning);
585                }
586            } else {
587                long start = System.currentTimeMillis();
588                long nextWarn = start;
589                while (!usage.waitForSpace(1000, highWaterMark)) {
590                    if (context.getStopping().get()) {
591                        throw new IOException("Connection closed, send aborted.");
592                    }
593        
594                    long now = System.currentTimeMillis();
595                    if (now >= nextWarn) {
596                        getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
597                        nextWarn = now + blockedProducerWarningInterval;
598                    }
599                }
600            }
601        }
602    
603        protected abstract Logger getLog();
604    
605        public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
606            this.slowConsumerStrategy = slowConsumerStrategy;
607        }
608    
609        public SlowConsumerStrategy getSlowConsumerStrategy() {
610            return this.slowConsumerStrategy;
611        }
612    
613       
614        public boolean isPrioritizedMessages() {
615            return this.prioritizedMessages;
616        }
617    
618        public void setPrioritizedMessages(boolean prioritizedMessages) {
619            this.prioritizedMessages = prioritizedMessages;
620            if (store != null) {
621                store.setPrioritizedMessages(prioritizedMessages);
622            }
623        }
624    
625        /**
626         * @return the inactiveTimoutBeforeGC
627         */
628        public long getInactiveTimoutBeforeGC() {
629            return this.inactiveTimoutBeforeGC;
630        }
631    
632        /**
633         * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
634         */
635        public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
636            this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
637        }
638    
639        /**
640         * @return the gcIfInactive
641         */
642        public boolean isGcIfInactive() {
643            return this.gcIfInactive;
644        }
645    
646        /**
647         * @param gcIfInactive the gcIfInactive to set
648         */
649        public void setGcIfInactive(boolean gcIfInactive) {
650            this.gcIfInactive = gcIfInactive;
651        }
652        
653        public void markForGC(long timeStamp) {
654            if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
655                    && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
656                this.lastActiveTime = timeStamp;
657            }
658        }
659    
660        public boolean canGC() {
661            boolean result = false;
662            if (isGcIfInactive()&& this.lastActiveTime != 0l) {
663                if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
664                    result = true;
665                }
666            }
667            return result;
668        }
669    
670        public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
671            this.reduceMemoryFootprint = reduceMemoryFootprint;
672        }
673    
674        protected boolean isReduceMemoryFootprint() {
675            return this.reduceMemoryFootprint;
676        }
677    }