001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import org.apache.activemq.broker.Broker;
020    import org.apache.activemq.broker.region.MessageReference;
021    import org.apache.activemq.broker.region.Queue;
022    import org.apache.activemq.command.Message;
023    import org.apache.activemq.usage.SystemUsage;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    /**
028     * Store based Cursor for Queues
029     * 
030     * 
031     */
032    public class StoreQueueCursor extends AbstractPendingMessageCursor {
033    
034        private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
035        private final Broker broker;
036        private int pendingCount;
037        private final Queue queue;
038        private PendingMessageCursor nonPersistent;
039        private final QueueStorePrefetch persistent;
040        private boolean started;
041        private PendingMessageCursor currentCursor;
042    
043        /**
044         * Construct
045         * @param broker 
046         * @param queue
047         */
048        public StoreQueueCursor(Broker broker,Queue queue) {
049            super((queue != null ? queue.isPrioritizedMessages():false));
050            this.broker=broker;
051            this.queue = queue;
052            this.persistent = new QueueStorePrefetch(queue);
053            currentCursor = persistent;
054        }
055    
056        public synchronized void start() throws Exception {
057            started = true;
058            super.start();
059            if (nonPersistent == null) {
060                if (broker.getBrokerService().isPersistent()) {
061                    nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
062                }else {
063                    nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
064                }
065                nonPersistent.setMaxBatchSize(getMaxBatchSize());
066                nonPersistent.setSystemUsage(systemUsage);
067                nonPersistent.setEnableAudit(isEnableAudit());
068                nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
069                nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
070            }
071            nonPersistent.setMessageAudit(getMessageAudit());
072            nonPersistent.start();
073            persistent.setMessageAudit(getMessageAudit());
074            persistent.start();
075            pendingCount = persistent.size() + nonPersistent.size();
076        }
077    
078        public synchronized void stop() throws Exception {
079            started = false;
080            if (nonPersistent != null) {
081                nonPersistent.stop();
082                nonPersistent.gc();
083            }
084            persistent.stop();
085            persistent.gc();
086            super.stop();
087            pendingCount = 0;
088        }
089    
090        public synchronized void addMessageLast(MessageReference node) throws Exception {
091            if (node != null) {
092                Message msg = node.getMessage();
093                if (started) {
094                    pendingCount++;
095                    if (!msg.isPersistent()) {
096                        nonPersistent.addMessageLast(node);
097                    }
098                }
099                if (msg.isPersistent()) {
100                    persistent.addMessageLast(node);
101                }
102            }
103        }
104        
105        public synchronized void addMessageFirst(MessageReference node) throws Exception {
106            if (node != null) {
107                Message msg = node.getMessage();
108                if (started) {
109                    pendingCount++;
110                    if (!msg.isPersistent()) {
111                        nonPersistent.addMessageFirst(node);
112                    }
113                }
114                if (msg.isPersistent()) {
115                    persistent.addMessageFirst(node);
116                }
117            }
118        }
119    
120        public synchronized void clear() {
121            pendingCount = 0;
122        }
123    
124        public synchronized boolean hasNext() {
125            try {
126                getNextCursor();
127            } catch (Exception e) {
128                LOG.error("Failed to get current cursor ", e);
129                throw new RuntimeException(e);
130           }
131           return currentCursor != null ? currentCursor.hasNext() : false;
132        }
133    
134        public synchronized MessageReference next() {
135            MessageReference result = currentCursor != null ? currentCursor.next() : null;
136            return result;
137        }
138    
139        public synchronized void remove() {
140            if (currentCursor != null) {
141                currentCursor.remove();
142            }
143            pendingCount--;
144        }
145    
146        public synchronized void remove(MessageReference node) {
147            if (!node.isPersistent()) {
148                nonPersistent.remove(node);
149            } else {
150                persistent.remove(node);
151            }
152            pendingCount--;
153        }
154    
155        public synchronized void reset() {
156            nonPersistent.reset();
157            persistent.reset();
158            pendingCount = persistent.size() + nonPersistent.size();        
159        }
160        
161        public void release() {
162            nonPersistent.release();
163            persistent.release();
164        }
165    
166    
167        public synchronized int size() {
168            if (pendingCount < 0) {
169                pendingCount = persistent.size() + nonPersistent.size();
170            }
171            return pendingCount;
172        }
173    
174        public synchronized boolean isEmpty() {
175            // if negative, more messages arrived in store since last reset so non empty
176            return pendingCount == 0;
177        }
178    
179        /**
180         * Informs the Broker if the subscription needs to intervention to recover
181         * it's state e.g. DurableTopicSubscriber may do
182         * 
183         * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184         * @return true if recovery required
185         */
186        public boolean isRecoveryRequired() {
187            return false;
188        }
189    
190        /**
191         * @return the nonPersistent Cursor
192         */
193        public PendingMessageCursor getNonPersistent() {
194            return this.nonPersistent;
195        }
196    
197        /**
198         * @param nonPersistent cursor to set
199         */
200        public void setNonPersistent(PendingMessageCursor nonPersistent) {
201            this.nonPersistent = nonPersistent;
202        }
203    
204        public void setMaxBatchSize(int maxBatchSize) {
205            persistent.setMaxBatchSize(maxBatchSize);
206            if (nonPersistent != null) {
207                nonPersistent.setMaxBatchSize(maxBatchSize);
208            }
209            super.setMaxBatchSize(maxBatchSize);
210        }
211        
212        
213        public void setMaxProducersToAudit(int maxProducersToAudit) {
214            super.setMaxProducersToAudit(maxProducersToAudit);
215            if (persistent != null) {
216                persistent.setMaxProducersToAudit(maxProducersToAudit);
217            }
218            if (nonPersistent != null) {
219                nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
220            }
221        }
222    
223        public void setMaxAuditDepth(int maxAuditDepth) {
224            super.setMaxAuditDepth(maxAuditDepth);
225            if (persistent != null) {
226                persistent.setMaxAuditDepth(maxAuditDepth);
227            }
228            if (nonPersistent != null) {
229                nonPersistent.setMaxAuditDepth(maxAuditDepth);
230            }
231        }
232        
233        public void setEnableAudit(boolean enableAudit) {
234            super.setEnableAudit(enableAudit);
235            if (persistent != null) {
236                persistent.setEnableAudit(enableAudit);
237            }
238            if (nonPersistent != null) {
239                nonPersistent.setEnableAudit(enableAudit);
240            }
241        }
242        
243        @Override
244        public void setUseCache(boolean useCache) {
245            super.setUseCache(useCache);
246            if (persistent != null) {
247                persistent.setUseCache(useCache);
248            }
249            if (nonPersistent != null) {
250                nonPersistent.setUseCache(useCache);
251            }
252        }
253        
254        @Override
255        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
256            super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
257            if (persistent != null) {
258                persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259            }
260            if (nonPersistent != null) {
261                nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
262            }
263        }
264    
265    
266    
267        public synchronized void gc() {
268            if (persistent != null) {
269                persistent.gc();
270            }
271            if (nonPersistent != null) {
272                nonPersistent.gc();
273            }
274            pendingCount = persistent.size() + nonPersistent.size();
275        }
276    
277        public void setSystemUsage(SystemUsage usageManager) {
278            super.setSystemUsage(usageManager);
279            if (persistent != null) {
280                persistent.setSystemUsage(usageManager);
281            }
282            if (nonPersistent != null) {
283                nonPersistent.setSystemUsage(usageManager);
284            }
285        }
286    
287        protected synchronized PendingMessageCursor getNextCursor() throws Exception {
288            if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
289                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
290                // sanity check
291                if (currentCursor.isEmpty()) {
292                    currentCursor = currentCursor == persistent ? nonPersistent : persistent;
293                }
294            }
295            return currentCursor;
296        }
297    
298        @Override
299        public boolean isCacheEnabled() {
300            boolean cacheEnabled = isUseCache();
301            if (cacheEnabled) {
302                if (persistent != null) {
303                    cacheEnabled &= persistent.isCacheEnabled();
304                }
305                if (nonPersistent != null) {
306                    cacheEnabled &= nonPersistent.isCacheEnabled();
307                }
308                setCacheEnabled(cacheEnabled);
309            }
310            return cacheEnabled;
311        }
312    }