001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.Collections; 020 import java.util.LinkedList; 021 import java.util.List; 022 import java.util.Set; 023 import org.apache.activemq.ActiveMQMessageAudit; 024 import org.apache.activemq.broker.Broker; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.broker.region.BaseDestination; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.Subscription; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.usage.SystemUsage; 032 033 /** 034 * Abstract method holder for pending message (messages awaiting disptach to a 035 * consumer) cursor 036 * 037 * 038 */ 039 public abstract class AbstractPendingMessageCursor implements PendingMessageCursor { 040 protected int memoryUsageHighWaterMark = 70; 041 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; 042 protected SystemUsage systemUsage; 043 protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT; 044 protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH; 045 protected boolean enableAudit=true; 046 protected ActiveMQMessageAudit audit; 047 protected boolean useCache=true; 048 private boolean cacheEnabled=true; 049 private boolean started=false; 050 protected MessageReference last = null; 051 protected final boolean prioritizedMessages; 052 053 public AbstractPendingMessageCursor(boolean prioritizedMessages) { 054 this.prioritizedMessages=prioritizedMessages; 055 } 056 057 058 public synchronized void start() throws Exception { 059 if (!started && enableAudit && audit==null) { 060 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 061 } 062 started=true; 063 } 064 065 public synchronized void stop() throws Exception { 066 started=false; 067 audit=null; 068 gc(); 069 } 070 071 public void add(ConnectionContext context, Destination destination) throws Exception { 072 } 073 074 @SuppressWarnings("unchecked") 075 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 076 return Collections.EMPTY_LIST; 077 } 078 079 public boolean isRecoveryRequired() { 080 return true; 081 } 082 083 public void addMessageFirst(MessageReference node) throws Exception { 084 } 085 086 public void addMessageLast(MessageReference node) throws Exception { 087 } 088 089 public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 090 addMessageLast(node); 091 return true; 092 } 093 094 public void addRecoveredMessage(MessageReference node) throws Exception { 095 addMessageLast(node); 096 } 097 098 public void clear() { 099 } 100 101 public boolean hasNext() { 102 return false; 103 } 104 105 public boolean isEmpty() { 106 return false; 107 } 108 109 public boolean isEmpty(Destination destination) { 110 return isEmpty(); 111 } 112 113 public MessageReference next() { 114 return null; 115 } 116 117 public void remove() { 118 } 119 120 public void reset() { 121 } 122 123 public int size() { 124 return 0; 125 } 126 127 public int getMaxBatchSize() { 128 return maxBatchSize; 129 } 130 131 public void setMaxBatchSize(int maxBatchSize) { 132 this.maxBatchSize = maxBatchSize; 133 } 134 135 protected void fillBatch() throws Exception { 136 } 137 138 public void resetForGC() { 139 reset(); 140 } 141 142 public void remove(MessageReference node) { 143 } 144 145 public void gc() { 146 } 147 148 public void setSystemUsage(SystemUsage usageManager) { 149 this.systemUsage = usageManager; 150 } 151 152 public boolean hasSpace() { 153 return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; 154 } 155 156 public boolean isFull() { 157 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; 158 } 159 160 public void release() { 161 } 162 163 public boolean hasMessagesBufferedToDeliver() { 164 return false; 165 } 166 167 /** 168 * @return the memoryUsageHighWaterMark 169 */ 170 public int getMemoryUsageHighWaterMark() { 171 return memoryUsageHighWaterMark; 172 } 173 174 /** 175 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 176 */ 177 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 178 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 179 } 180 181 /** 182 * @return the usageManager 183 */ 184 public SystemUsage getSystemUsage() { 185 return this.systemUsage; 186 } 187 188 /** 189 * destroy the cursor 190 * 191 * @throws Exception 192 */ 193 public void destroy() throws Exception { 194 stop(); 195 } 196 197 /** 198 * Page in a restricted number of messages 199 * 200 * @param maxItems maximum number of messages to return 201 * @return a list of paged in messages 202 */ 203 public LinkedList<MessageReference> pageInList(int maxItems) { 204 throw new RuntimeException("Not supported"); 205 } 206 207 /** 208 * @return the maxProducersToAudit 209 */ 210 public int getMaxProducersToAudit() { 211 return maxProducersToAudit; 212 } 213 214 /** 215 * @param maxProducersToAudit the maxProducersToAudit to set 216 */ 217 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 218 this.maxProducersToAudit = maxProducersToAudit; 219 if (audit != null) { 220 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 221 } 222 } 223 224 /** 225 * @return the maxAuditDepth 226 */ 227 public int getMaxAuditDepth() { 228 return maxAuditDepth; 229 } 230 231 232 /** 233 * @param maxAuditDepth the maxAuditDepth to set 234 */ 235 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 236 this.maxAuditDepth = maxAuditDepth; 237 if (audit != null) { 238 audit.setAuditDepth(maxAuditDepth); 239 } 240 } 241 242 243 /** 244 * @return the enableAudit 245 */ 246 public boolean isEnableAudit() { 247 return enableAudit; 248 } 249 250 /** 251 * @param enableAudit the enableAudit to set 252 */ 253 public synchronized void setEnableAudit(boolean enableAudit) { 254 this.enableAudit = enableAudit; 255 if (enableAudit && started && audit==null) { 256 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 257 } 258 } 259 260 public boolean isTransient() { 261 return false; 262 } 263 264 265 /** 266 * set the audit 267 * @param audit new audit component 268 */ 269 public void setMessageAudit(ActiveMQMessageAudit audit) { 270 this.audit=audit; 271 } 272 273 274 /** 275 * @return the audit 276 */ 277 public ActiveMQMessageAudit getMessageAudit() { 278 return audit; 279 } 280 281 public boolean isUseCache() { 282 return useCache; 283 } 284 285 public void setUseCache(boolean useCache) { 286 this.useCache = useCache; 287 } 288 289 public synchronized boolean isDuplicate(MessageId messageId) { 290 boolean unique = recordUniqueId(messageId); 291 rollback(messageId); 292 return !unique; 293 } 294 295 /** 296 * records a message id and checks if it is a duplicate 297 * @param messageId 298 * @return true if id is unique, false otherwise. 299 */ 300 public synchronized boolean recordUniqueId(MessageId messageId) { 301 if (!enableAudit || audit==null) { 302 return true; 303 } 304 return !audit.isDuplicate(messageId); 305 } 306 307 public synchronized void rollback(MessageId id) { 308 if (audit != null) { 309 audit.rollback(id); 310 } 311 } 312 313 protected synchronized boolean isStarted() { 314 return started; 315 } 316 317 public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) { 318 boolean result = false; 319 Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination()); 320 if (destinations != null) { 321 for (Destination dest:destinations) { 322 if (dest.isPrioritizedMessages()) { 323 result = true; 324 break; 325 } 326 } 327 } 328 return result; 329 330 } 331 332 public synchronized boolean isCacheEnabled() { 333 return cacheEnabled; 334 } 335 336 public synchronized void setCacheEnabled(boolean val) { 337 cacheEnabled = val; 338 } 339 }