001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.Collections; 020 import java.util.HashMap; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 import org.apache.activemq.advisory.AdvisorySupport; 025 import org.apache.activemq.broker.Broker; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.DurableTopicSubscription; 029 import org.apache.activemq.broker.region.MessageReference; 030 import org.apache.activemq.broker.region.Topic; 031 import org.apache.activemq.command.Message; 032 import org.apache.activemq.usage.SystemUsage; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * persist pending messages pending message (messages awaiting dispatch to a 038 * consumer) cursor 039 * 040 * 041 */ 042 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class); 045 private static final int UNKNOWN = -1; 046 private final String clientId; 047 private final String subscriberName; 048 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); 049 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>(); 050 private final PendingMessageCursor nonPersistent; 051 private PendingMessageCursor currentCursor; 052 private final DurableTopicSubscription subscription; 053 private int cacheCurrentLowestPriority = UNKNOWN; 054 private boolean immediatePriorityDispatch = true; 055 /** 056 * @param broker Broker for this cursor 057 * @param clientId clientId for this cursor 058 * @param subscriberName subscriber name for this cursor 059 * @param maxBatchSize currently ignored 060 * @param subscription subscription for this cursor 061 */ 062 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) { 063 super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); 064 this.subscription=subscription; 065 this.clientId = clientId; 066 this.subscriberName = subscriberName; 067 if (broker.getBrokerService().isPersistent()) { 068 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); 069 } else { 070 this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 071 } 072 073 this.nonPersistent.setMaxBatchSize(maxBatchSize); 074 this.nonPersistent.setSystemUsage(systemUsage); 075 this.storePrefetches.add(this.nonPersistent); 076 077 if (prioritizedMessages) { 078 setMaxAuditDepth(10*getMaxAuditDepth()); 079 } 080 } 081 082 @Override 083 public synchronized void start() throws Exception { 084 if (!isStarted()) { 085 super.start(); 086 for (PendingMessageCursor tsp : storePrefetches) { 087 tsp.setMessageAudit(getMessageAudit()); 088 tsp.start(); 089 } 090 } 091 } 092 093 @Override 094 public synchronized void stop() throws Exception { 095 if (isStarted()) { 096 if (subscription.isKeepDurableSubsActive()) { 097 super.gc(); 098 super.getMessageAudit().clear(); 099 for (PendingMessageCursor tsp : storePrefetches) { 100 tsp.gc(); 101 tsp.getMessageAudit().clear(); 102 } 103 } else { 104 super.stop(); 105 for (PendingMessageCursor tsp : storePrefetches) { 106 tsp.stop(); 107 } 108 } 109 } 110 } 111 112 /** 113 * Add a destination 114 * 115 * @param context 116 * @param destination 117 * @throws Exception 118 */ 119 @Override 120 public synchronized void add(ConnectionContext context, Destination destination) throws Exception { 121 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 122 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); 123 tsp.setMaxBatchSize(destination.getMaxPageSize()); 124 tsp.setSystemUsage(systemUsage); 125 tsp.setMessageAudit(getMessageAudit()); 126 tsp.setEnableAudit(isEnableAudit()); 127 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark()); 128 topics.put(destination, tsp); 129 storePrefetches.add(tsp); 130 if (isStarted()) { 131 tsp.start(); 132 } 133 } 134 } 135 136 /** 137 * remove a destination 138 * 139 * @param context 140 * @param destination 141 * @throws Exception 142 */ 143 @Override 144 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 145 PendingMessageCursor tsp = topics.remove(destination); 146 if (tsp != null) { 147 storePrefetches.remove(tsp); 148 } 149 return Collections.EMPTY_LIST; 150 } 151 152 /** 153 * @return true if there are no pending messages 154 */ 155 @Override 156 public synchronized boolean isEmpty() { 157 for (PendingMessageCursor tsp : storePrefetches) { 158 if( !tsp.isEmpty() ) 159 return false; 160 } 161 return true; 162 } 163 164 @Override 165 public synchronized boolean isEmpty(Destination destination) { 166 boolean result = true; 167 TopicStorePrefetch tsp = topics.get(destination); 168 if (tsp != null) { 169 result = tsp.isEmpty(); 170 } 171 return result; 172 } 173 174 /** 175 * Informs the Broker if the subscription needs to intervention to recover 176 * it's state e.g. DurableTopicSubscriber may do 177 * 178 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor 179 * @return true if recovery required 180 */ 181 @Override 182 public boolean isRecoveryRequired() { 183 return false; 184 } 185 186 @Override 187 public synchronized void addMessageLast(MessageReference node) throws Exception { 188 if (node != null) { 189 Message msg = node.getMessage(); 190 if (isStarted()) { 191 if (!msg.isPersistent()) { 192 nonPersistent.addMessageLast(node); 193 } 194 } 195 if (msg.isPersistent()) { 196 Destination dest = msg.getRegionDestination(); 197 TopicStorePrefetch tsp = topics.get(dest); 198 if (tsp != null) { 199 // cache can become high priority cache for immediate dispatch 200 final int priority = msg.getPriority(); 201 if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) { 202 if (priority > tsp.getCurrentLowestPriority()) { 203 if (LOG.isTraceEnabled()) { 204 LOG.trace("enabling cache for cursor on high priority message " + priority 205 + ", current lowest: " + tsp.getCurrentLowestPriority()); 206 } 207 tsp.setCacheEnabled(true); 208 cacheCurrentLowestPriority = tsp.getCurrentLowestPriority(); 209 } 210 } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) { 211 // go to the store to get next priority message as lower priority messages may be recovered 212 // already and need to acked sequence order 213 if (LOG.isTraceEnabled()) { 214 LOG.trace("disabling/clearing cache for cursor on lower priority message " 215 + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority() 216 + " cache lowest: " + cacheCurrentLowestPriority); 217 } 218 tsp.setCacheEnabled(false); 219 cacheCurrentLowestPriority = UNKNOWN; 220 } 221 tsp.addMessageLast(node); 222 } 223 } 224 225 } 226 } 227 228 @Override 229 public boolean isTransient() { 230 return subscription.isKeepDurableSubsActive(); 231 } 232 233 @Override 234 public void addMessageFirst(MessageReference node) throws Exception { 235 // for keep durable subs active, need to deal with redispatch 236 if (node != null) { 237 Message msg = node.getMessage(); 238 if (!msg.isPersistent()) { 239 nonPersistent.addMessageFirst(node); 240 } else { 241 Destination dest = msg.getRegionDestination(); 242 TopicStorePrefetch tsp = topics.get(dest); 243 if (tsp != null) { 244 tsp.addMessageFirst(node); 245 } 246 } 247 } 248 } 249 250 @Override 251 public synchronized void addRecoveredMessage(MessageReference node) throws Exception { 252 nonPersistent.addMessageLast(node); 253 } 254 255 @Override 256 public synchronized void clear() { 257 for (PendingMessageCursor tsp : storePrefetches) { 258 tsp.clear(); 259 } 260 } 261 262 @Override 263 public synchronized boolean hasNext() { 264 boolean result = true; 265 if (result) { 266 try { 267 currentCursor = getNextCursor(); 268 } catch (Exception e) { 269 LOG.error("Failed to get current cursor ", e); 270 throw new RuntimeException(e); 271 } 272 result = currentCursor != null ? currentCursor.hasNext() : false; 273 } 274 return result; 275 } 276 277 @Override 278 public synchronized MessageReference next() { 279 MessageReference result = currentCursor != null ? currentCursor.next() : null; 280 return result; 281 } 282 283 @Override 284 public synchronized void remove() { 285 if (currentCursor != null) { 286 currentCursor.remove(); 287 } 288 } 289 290 @Override 291 public synchronized void remove(MessageReference node) { 292 if (currentCursor != null) { 293 currentCursor.remove(node); 294 } 295 } 296 297 @Override 298 public synchronized void reset() { 299 for (PendingMessageCursor storePrefetch : storePrefetches) { 300 storePrefetch.reset(); 301 } 302 } 303 304 @Override 305 public synchronized void release() { 306 for (PendingMessageCursor storePrefetch : storePrefetches) { 307 storePrefetch.release(); 308 } 309 } 310 311 @Override 312 public synchronized int size() { 313 int pendingCount=0; 314 for (PendingMessageCursor tsp : storePrefetches) { 315 pendingCount += tsp.size(); 316 } 317 return pendingCount; 318 } 319 320 @Override 321 public void setMaxBatchSize(int newMaxBatchSize) { 322 for (PendingMessageCursor storePrefetch : storePrefetches) { 323 storePrefetch.setMaxBatchSize(newMaxBatchSize); 324 } 325 super.setMaxBatchSize(newMaxBatchSize); 326 } 327 328 @Override 329 public synchronized void gc() { 330 for (PendingMessageCursor tsp : storePrefetches) { 331 tsp.gc(); 332 } 333 cacheCurrentLowestPriority = UNKNOWN; 334 } 335 336 @Override 337 public void setSystemUsage(SystemUsage usageManager) { 338 super.setSystemUsage(usageManager); 339 for (PendingMessageCursor tsp : storePrefetches) { 340 tsp.setSystemUsage(usageManager); 341 } 342 } 343 344 @Override 345 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 346 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 347 for (PendingMessageCursor cursor : storePrefetches) { 348 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 349 } 350 } 351 352 @Override 353 public void setMaxProducersToAudit(int maxProducersToAudit) { 354 super.setMaxProducersToAudit(maxProducersToAudit); 355 for (PendingMessageCursor cursor : storePrefetches) { 356 cursor.setMaxAuditDepth(maxAuditDepth); 357 } 358 } 359 360 @Override 361 public void setMaxAuditDepth(int maxAuditDepth) { 362 super.setMaxAuditDepth(maxAuditDepth); 363 for (PendingMessageCursor cursor : storePrefetches) { 364 cursor.setMaxAuditDepth(maxAuditDepth); 365 } 366 } 367 368 @Override 369 public void setEnableAudit(boolean enableAudit) { 370 super.setEnableAudit(enableAudit); 371 for (PendingMessageCursor cursor : storePrefetches) { 372 cursor.setEnableAudit(enableAudit); 373 } 374 } 375 376 @Override 377 public void setUseCache(boolean useCache) { 378 super.setUseCache(useCache); 379 for (PendingMessageCursor cursor : storePrefetches) { 380 cursor.setUseCache(useCache); 381 } 382 } 383 384 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 385 if (currentCursor == null || currentCursor.isEmpty()) { 386 currentCursor = null; 387 for (PendingMessageCursor tsp : storePrefetches) { 388 if (tsp.hasNext()) { 389 currentCursor = tsp; 390 break; 391 } 392 } 393 // round-robin 394 if (storePrefetches.size()>1) { 395 PendingMessageCursor first = storePrefetches.remove(0); 396 storePrefetches.add(first); 397 } 398 } 399 return currentCursor; 400 } 401 402 @Override 403 public String toString() { 404 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; 405 } 406 407 public boolean isImmediatePriorityDispatch() { 408 return immediatePriorityDispatch; 409 } 410 411 public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { 412 this.immediatePriorityDispatch = immediatePriorityDispatch; 413 } 414 415 }