001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Collections;
020    import java.util.LinkedList;
021    import java.util.List;
022    import java.util.Set;
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.BaseDestination;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.Subscription;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.usage.SystemUsage;
032    
033    /**
034     * Abstract method holder for pending message (messages awaiting disptach to a
035     * consumer) cursor
036     * 
037     * 
038     */
039    public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040        protected int memoryUsageHighWaterMark = 70;
041        protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042        protected SystemUsage systemUsage;
043        protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044        protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045        protected boolean enableAudit=true;
046        protected ActiveMQMessageAudit audit;
047        protected boolean useCache=true;
048        private boolean cacheEnabled=true;
049        private boolean started=false;
050        protected MessageReference last = null;
051        protected final boolean prioritizedMessages;
052        
053        public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054            this.prioritizedMessages=prioritizedMessages;
055        }
056      
057    
058        public synchronized void start() throws Exception  {
059            if (!started && enableAudit && audit==null) {
060                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061            }
062            started=true;
063        }
064    
065        public synchronized void stop() throws Exception  {
066            started=false;
067            audit=null;
068            gc();
069        }
070    
071        public void add(ConnectionContext context, Destination destination) throws Exception {
072        }
073    
074        @SuppressWarnings("unchecked")
075        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
076            return Collections.EMPTY_LIST;
077        }
078    
079        public boolean isRecoveryRequired() {
080            return true;
081        }
082    
083        public void addMessageFirst(MessageReference node) throws Exception {
084        }
085    
086        public void addMessageLast(MessageReference node) throws Exception {
087        }
088        
089        public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
090            addMessageLast(node);
091            return true;
092        }
093    
094        public void addRecoveredMessage(MessageReference node) throws Exception {
095            addMessageLast(node);
096        }
097    
098        public void clear() {
099        }
100    
101        public boolean hasNext() {
102            return false;
103        }
104    
105        public boolean isEmpty() {
106            return false;
107        }
108    
109        public boolean isEmpty(Destination destination) {
110            return isEmpty();
111        }
112    
113        public MessageReference next() {
114            return null;
115        }
116    
117        public void remove() {
118        }
119    
120        public void reset() {
121        }
122    
123        public int size() {
124            return 0;
125        }
126    
127        public int getMaxBatchSize() {
128            return maxBatchSize;
129        }
130    
131        public void setMaxBatchSize(int maxBatchSize) {
132            this.maxBatchSize = maxBatchSize;
133        }
134    
135        protected void fillBatch() throws Exception {
136        }
137    
138        public void resetForGC() {
139            reset();
140        }
141    
142        public void remove(MessageReference node) {
143        }
144    
145        public void gc() {
146        }
147    
148        public void setSystemUsage(SystemUsage usageManager) {
149            this.systemUsage = usageManager;
150        }
151    
152        public boolean hasSpace() {
153            return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
154        }
155    
156        public boolean isFull() {
157            return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
158        }
159    
160        public void release() {
161        }
162    
163        public boolean hasMessagesBufferedToDeliver() {
164            return false;
165        }
166    
167        /**
168         * @return the memoryUsageHighWaterMark
169         */
170        public int getMemoryUsageHighWaterMark() {
171            return memoryUsageHighWaterMark;
172        }
173    
174        /**
175         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
176         */
177        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
178            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
179        }
180    
181        /**
182         * @return the usageManager
183         */
184        public SystemUsage getSystemUsage() {
185            return this.systemUsage;
186        }
187    
188        /**
189         * destroy the cursor
190         * 
191         * @throws Exception
192         */
193        public void destroy() throws Exception {
194            stop();
195        }
196    
197        /**
198         * Page in a restricted number of messages
199         * 
200         * @param maxItems maximum number of messages to return
201         * @return a list of paged in messages
202         */
203        public LinkedList<MessageReference> pageInList(int maxItems) {
204            throw new RuntimeException("Not supported");
205        }
206    
207        /**
208         * @return the maxProducersToAudit
209         */
210        public int getMaxProducersToAudit() {
211            return maxProducersToAudit;
212        }
213    
214        /**
215         * @param maxProducersToAudit the maxProducersToAudit to set
216         */
217        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
218            this.maxProducersToAudit = maxProducersToAudit;
219            if (audit != null) {
220                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
221            }
222        }
223    
224        /**
225         * @return the maxAuditDepth
226         */
227        public int getMaxAuditDepth() {
228            return maxAuditDepth;
229        }
230        
231    
232        /**
233         * @param maxAuditDepth the maxAuditDepth to set
234         */
235        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
236            this.maxAuditDepth = maxAuditDepth;
237            if (audit != null) {
238                audit.setAuditDepth(maxAuditDepth);
239            }
240        }
241        
242        
243        /**
244         * @return the enableAudit
245         */
246        public boolean isEnableAudit() {
247            return enableAudit;
248        }
249    
250        /**
251         * @param enableAudit the enableAudit to set
252         */
253        public synchronized void setEnableAudit(boolean enableAudit) {
254            this.enableAudit = enableAudit;
255            if (enableAudit && started && audit==null) {
256                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
257            }
258        }
259        
260        public boolean isTransient() {
261            return false;
262        }
263        
264           
265        /**
266         * set the audit
267         * @param audit new audit component
268         */
269        public void setMessageAudit(ActiveMQMessageAudit audit) {
270            this.audit=audit;
271        }
272        
273        
274        /**
275         * @return the audit
276         */
277        public ActiveMQMessageAudit getMessageAudit() {
278            return audit;
279        }
280        
281        public boolean isUseCache() {
282            return useCache;
283        }
284    
285        public void setUseCache(boolean useCache) {
286            this.useCache = useCache;
287        }
288    
289        public synchronized boolean isDuplicate(MessageId messageId) {
290            boolean unique = recordUniqueId(messageId);
291            rollback(messageId);
292            return !unique;
293        }
294        
295        /**
296         * records a message id and checks if it is a duplicate
297         * @param messageId
298         * @return true if id is unique, false otherwise.
299         */
300        public synchronized boolean recordUniqueId(MessageId messageId) {
301            if (!enableAudit || audit==null) {
302                return true;
303            }
304            return !audit.isDuplicate(messageId);
305        }
306        
307        public synchronized void rollback(MessageId id) {
308            if (audit != null) {
309                audit.rollback(id);
310            }
311        }
312        
313        protected synchronized boolean isStarted() {
314            return started;
315        }
316        
317        public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
318            boolean result = false;
319            Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
320            if (destinations != null) {
321                for (Destination dest:destinations) {
322                    if (dest.isPrioritizedMessages()) {
323                        result = true;
324                        break;
325                    }
326                }
327            }
328            return result;
329    
330        }
331    
332        public synchronized boolean isCacheEnabled() {
333            return cacheEnabled;
334        }
335    
336        public synchronized void setCacheEnabled(boolean val) {
337            cacheEnabled = val;
338        }
339    }