001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Collections;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import org.apache.activemq.advisory.AdvisorySupport;
025    import org.apache.activemq.broker.Broker;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.DurableTopicSubscription;
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.broker.region.Topic;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.usage.SystemUsage;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * persist pending messages pending message (messages awaiting dispatch to a
038     * consumer) cursor
039     * 
040     * 
041     */
042    public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
045        private static final int UNKNOWN = -1;
046        private final String clientId;
047        private final String subscriberName;
048        private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
049        private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
050        private final PendingMessageCursor nonPersistent;
051        private PendingMessageCursor currentCursor;
052        private final DurableTopicSubscription subscription;
053        private int cacheCurrentLowestPriority = UNKNOWN;
054        private boolean immediatePriorityDispatch = true;
055        /**
056         * @param broker Broker for this cursor
057         * @param clientId clientId for this cursor
058         * @param subscriberName subscriber name for this cursor
059         * @param maxBatchSize currently ignored
060         * @param subscription  subscription for this cursor
061         */
062        public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
063            super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
064            this.subscription=subscription;
065            this.clientId = clientId;
066            this.subscriberName = subscriberName;
067            if (broker.getBrokerService().isPersistent()) {
068                this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
069            } else {
070                this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
071            }
072            
073            this.nonPersistent.setMaxBatchSize(maxBatchSize);
074            this.nonPersistent.setSystemUsage(systemUsage);
075            this.storePrefetches.add(this.nonPersistent);
076    
077            if (prioritizedMessages) {
078                setMaxAuditDepth(10*getMaxAuditDepth());
079            }
080        }
081    
082        @Override
083        public synchronized void start() throws Exception {
084            if (!isStarted()) {
085                super.start();
086                for (PendingMessageCursor tsp : storePrefetches) {
087                    tsp.setMessageAudit(getMessageAudit());
088                    tsp.start();
089                }
090            }
091        }
092    
093        @Override
094        public synchronized void stop() throws Exception {
095            if (isStarted()) {
096                if (subscription.isKeepDurableSubsActive()) {
097                    super.gc();
098                    super.getMessageAudit().clear();
099                    for (PendingMessageCursor tsp : storePrefetches) {
100                        tsp.gc();
101                        tsp.getMessageAudit().clear();
102                    }
103                } else {
104                    super.stop();
105                    for (PendingMessageCursor tsp : storePrefetches) {
106                        tsp.stop();
107                    }
108                }
109            }
110        }
111    
112        /**
113         * Add a destination
114         * 
115         * @param context
116         * @param destination
117         * @throws Exception
118         */
119        @Override
120        public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
121            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
122                TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
123                tsp.setMaxBatchSize(destination.getMaxPageSize());
124                tsp.setSystemUsage(systemUsage);
125                tsp.setMessageAudit(getMessageAudit());
126                tsp.setEnableAudit(isEnableAudit());
127                tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
128                topics.put(destination, tsp);
129                storePrefetches.add(tsp);
130                if (isStarted()) {
131                    tsp.start();
132                }
133            }
134        }
135    
136        /**
137         * remove a destination
138         * 
139         * @param context
140         * @param destination
141         * @throws Exception
142         */
143        @Override
144        public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
145            PendingMessageCursor tsp = topics.remove(destination);
146            if (tsp != null) {
147                storePrefetches.remove(tsp);
148            }
149            return Collections.EMPTY_LIST;
150        }
151    
152        /**
153         * @return true if there are no pending messages
154         */
155        @Override
156        public synchronized boolean isEmpty() {
157            for (PendingMessageCursor tsp : storePrefetches) {
158                if( !tsp.isEmpty() )
159                    return false;
160            }
161            return true;
162        }
163    
164        @Override
165        public synchronized boolean isEmpty(Destination destination) {
166            boolean result = true;
167            TopicStorePrefetch tsp = topics.get(destination);
168            if (tsp != null) {
169                result = tsp.isEmpty();
170            }
171            return result;
172        }
173    
174        /**
175         * Informs the Broker if the subscription needs to intervention to recover
176         * it's state e.g. DurableTopicSubscriber may do
177         * 
178         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
179         * @return true if recovery required
180         */
181        @Override
182        public boolean isRecoveryRequired() {
183            return false;
184        }
185    
186        @Override
187        public synchronized void addMessageLast(MessageReference node) throws Exception {
188            if (node != null) {
189                Message msg = node.getMessage();
190                if (isStarted()) {
191                    if (!msg.isPersistent()) {
192                        nonPersistent.addMessageLast(node);
193                    }
194                }
195                if (msg.isPersistent()) {
196                    Destination dest = msg.getRegionDestination();
197                    TopicStorePrefetch tsp = topics.get(dest);
198                    if (tsp != null) {
199                        // cache can become high priority cache for immediate dispatch
200                        final int priority = msg.getPriority();
201                        if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
202                            if (priority > tsp.getCurrentLowestPriority()) {
203                                if (LOG.isTraceEnabled()) {
204                                    LOG.trace("enabling cache for cursor on high priority message " + priority
205                                            + ", current lowest: " + tsp.getCurrentLowestPriority());
206                                }
207                                tsp.setCacheEnabled(true);
208                                cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
209                            }
210                        } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
211                            // go to the store to get next priority message as lower priority messages may be recovered
212                            // already and need to acked sequence order
213                            if (LOG.isTraceEnabled()) {
214                                LOG.trace("disabling/clearing cache for cursor on lower priority message "
215                                        + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
216                                        + " cache lowest: " + cacheCurrentLowestPriority);
217                            }
218                            tsp.setCacheEnabled(false);
219                            cacheCurrentLowestPriority = UNKNOWN;
220                        }
221                        tsp.addMessageLast(node);
222                    }
223                }
224    
225            }
226        }
227    
228        @Override
229        public boolean isTransient() {
230            return subscription.isKeepDurableSubsActive();
231        }
232    
233        @Override
234        public void addMessageFirst(MessageReference node) throws Exception {
235            // for keep durable subs active, need to deal with redispatch
236            if (node != null) {
237                Message msg = node.getMessage();
238                if (!msg.isPersistent()) {
239                    nonPersistent.addMessageFirst(node);
240                } else {
241                    Destination dest = msg.getRegionDestination();
242                    TopicStorePrefetch tsp = topics.get(dest);
243                    if (tsp != null) {
244                        tsp.addMessageFirst(node);
245                    }
246                }
247            }
248        }
249    
250        @Override
251        public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
252            nonPersistent.addMessageLast(node);
253        }
254    
255        @Override
256        public synchronized void clear() {
257            for (PendingMessageCursor tsp : storePrefetches) {
258                tsp.clear();
259            }
260        }
261    
262        @Override
263        public synchronized boolean hasNext() {
264            boolean result = true;
265            if (result) {
266                try {
267                    currentCursor = getNextCursor();
268                } catch (Exception e) {
269                    LOG.error("Failed to get current cursor ", e);
270                    throw new RuntimeException(e);
271                }
272                result = currentCursor != null ? currentCursor.hasNext() : false;
273            }
274            return result;
275        }
276    
277        @Override
278        public synchronized MessageReference next() {
279            MessageReference result = currentCursor != null ? currentCursor.next() : null;
280            return result;
281        }
282    
283        @Override
284        public synchronized void remove() {
285            if (currentCursor != null) {
286                currentCursor.remove();
287            }
288        }
289    
290        @Override
291        public synchronized void remove(MessageReference node) {
292            if (currentCursor != null) {
293                currentCursor.remove(node);
294            }
295        }
296    
297        @Override
298        public synchronized void reset() {
299            for (PendingMessageCursor storePrefetch : storePrefetches) {
300                storePrefetch.reset();
301            }
302        }
303    
304        @Override
305        public synchronized void release() {
306            for (PendingMessageCursor storePrefetch : storePrefetches) {
307                storePrefetch.release();
308            }
309        }
310    
311        @Override
312        public synchronized int size() {
313            int pendingCount=0;
314            for (PendingMessageCursor tsp : storePrefetches) {
315                pendingCount += tsp.size();
316            }
317            return pendingCount;
318        }
319    
320        @Override
321        public void setMaxBatchSize(int newMaxBatchSize) {
322            for (PendingMessageCursor storePrefetch : storePrefetches) {
323                storePrefetch.setMaxBatchSize(newMaxBatchSize);
324            }
325            super.setMaxBatchSize(newMaxBatchSize);
326        }
327    
328        @Override
329        public synchronized void gc() {
330            for (PendingMessageCursor tsp : storePrefetches) {
331                tsp.gc();
332            }
333            cacheCurrentLowestPriority = UNKNOWN;
334        }
335    
336        @Override
337        public void setSystemUsage(SystemUsage usageManager) {
338            super.setSystemUsage(usageManager);
339            for (PendingMessageCursor tsp : storePrefetches) {
340                tsp.setSystemUsage(usageManager);
341            }
342        }
343        
344        @Override
345        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
346            super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
347            for (PendingMessageCursor cursor : storePrefetches) {
348                cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
349            }
350        }
351        
352        @Override
353        public void setMaxProducersToAudit(int maxProducersToAudit) {
354            super.setMaxProducersToAudit(maxProducersToAudit);
355            for (PendingMessageCursor cursor : storePrefetches) {
356                cursor.setMaxAuditDepth(maxAuditDepth);
357            }
358        }
359    
360        @Override
361        public void setMaxAuditDepth(int maxAuditDepth) {
362            super.setMaxAuditDepth(maxAuditDepth);
363            for (PendingMessageCursor cursor : storePrefetches) {
364                cursor.setMaxAuditDepth(maxAuditDepth);
365            }
366        }
367        
368        @Override
369        public void setEnableAudit(boolean enableAudit) {
370            super.setEnableAudit(enableAudit);
371            for (PendingMessageCursor cursor : storePrefetches) {
372                cursor.setEnableAudit(enableAudit);
373            }
374        }
375        
376        @Override
377        public  void setUseCache(boolean useCache) {
378            super.setUseCache(useCache);
379            for (PendingMessageCursor cursor : storePrefetches) {
380                cursor.setUseCache(useCache);
381            }
382        }
383        
384        protected synchronized PendingMessageCursor getNextCursor() throws Exception {
385            if (currentCursor == null || currentCursor.isEmpty()) {
386                currentCursor = null;
387                for (PendingMessageCursor tsp : storePrefetches) {
388                    if (tsp.hasNext()) {
389                        currentCursor = tsp;
390                        break;
391                    }
392                }
393                // round-robin
394                if (storePrefetches.size()>1) {
395                    PendingMessageCursor first = storePrefetches.remove(0);
396                    storePrefetches.add(first);
397                }
398            }
399            return currentCursor;
400        }
401        
402        @Override
403        public String toString() {
404            return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
405        }
406    
407        public boolean isImmediatePriorityDispatch() {
408            return immediatePriorityDispatch;
409        }
410    
411        public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
412            this.immediatePriorityDispatch = immediatePriorityDispatch;
413        }
414    
415    }