001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.concurrent.ConcurrentHashMap; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 024 import javax.jms.InvalidSelectorException; 025 import javax.jms.JMSException; 026 027 import org.apache.activemq.broker.Broker; 028 import org.apache.activemq.broker.ConnectionContext; 029 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 030 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ConsumerInfo; 033 import org.apache.activemq.command.Message; 034 import org.apache.activemq.command.MessageAck; 035 import org.apache.activemq.command.MessageDispatch; 036 import org.apache.activemq.command.MessageId; 037 import org.apache.activemq.filter.MessageEvaluationContext; 038 import org.apache.activemq.store.TopicMessageStore; 039 import org.apache.activemq.usage.SystemUsage; 040 import org.apache.activemq.usage.Usage; 041 import org.apache.activemq.usage.UsageListener; 042 import org.apache.activemq.util.SubscriptionKey; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 047 048 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 049 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 050 private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 051 private final SubscriptionKey subscriptionKey; 052 private final boolean keepDurableSubsActive; 053 private AtomicBoolean active = new AtomicBoolean(); 054 055 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 056 throws JMSException { 057 super(broker,usageManager, context, info); 058 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 059 this.pending.setSystemUsage(usageManager); 060 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 061 this.keepDurableSubsActive = keepDurableSubsActive; 062 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 063 064 } 065 066 public final boolean isActive() { 067 return active.get(); 068 } 069 070 public boolean isFull() { 071 return !active.get() || super.isFull(); 072 } 073 074 public void gc() { 075 } 076 077 /** 078 * store will have a pending ack for all durables, irrespective of the selector 079 * so we need to ack if node is un-matched 080 */ 081 public void unmatched(MessageReference node) throws IOException { 082 MessageAck ack = new MessageAck(); 083 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 084 ack.setMessageID(node.getMessageId()); 085 node.getRegionDestination().acknowledge(this.getContext(), this, ack, node); 086 } 087 088 @Override 089 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 090 // statically configured via maxPageSize 091 } 092 093 public void add(ConnectionContext context, Destination destination) throws Exception { 094 super.add(context, destination); 095 // do it just once per destination 096 if (destinations.containsKey(destination.getActiveMQDestination())) { 097 return; 098 } 099 destinations.put(destination.getActiveMQDestination(), destination); 100 101 if (active.get() || keepDurableSubsActive) { 102 Topic topic = (Topic)destination; 103 topic.activate(context, this); 104 if (pending.isEmpty(topic)) { 105 topic.recoverRetroactiveMessages(context, this); 106 } 107 this.enqueueCounter+=pending.size(); 108 } else if (destination.getMessageStore() != null) { 109 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); 110 try { 111 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); 112 } catch (IOException e) { 113 JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); 114 jmsEx.setLinkedException(e); 115 throw jmsEx; 116 } 117 } 118 dispatchPending(); 119 } 120 121 public void activate(SystemUsage memoryManager, ConnectionContext context, 122 ConsumerInfo info) throws Exception { 123 if (!active.get()) { 124 this.context = context; 125 this.info = info; 126 LOG.debug("Activating " + this); 127 if (!keepDurableSubsActive) { 128 for (Iterator<Destination> iter = destinations.values() 129 .iterator(); iter.hasNext();) { 130 Topic topic = (Topic) iter.next(); 131 add(context, topic); 132 topic.activate(context, this); 133 } 134 } 135 synchronized (pendingLock) { 136 pending.setSystemUsage(memoryManager); 137 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 138 pending.setMaxAuditDepth(getMaxAuditDepth()); 139 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 140 pending.start(); 141 // If nothing was in the persistent store, then try to use the 142 // recovery policy. 143 if (pending.isEmpty()) { 144 for (Iterator<Destination> iter = destinations.values() 145 .iterator(); iter.hasNext();) { 146 Topic topic = (Topic) iter.next(); 147 topic.recoverRetroactiveMessages(context, this); 148 } 149 } 150 } 151 this.active.set(true); 152 dispatchPending(); 153 this.usageManager.getMemoryUsage().addUsageListener(this); 154 } 155 } 156 157 public void deactivate(boolean keepDurableSubsActive) throws Exception { 158 LOG.debug("Deactivating " + this); 159 active.set(false); 160 this.usageManager.getMemoryUsage().removeUsageListener(this); 161 synchronized (pending) { 162 pending.stop(); 163 } 164 if (!keepDurableSubsActive) { 165 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 166 Topic topic = (Topic)iter.next(); 167 topic.deactivate(context, this); 168 } 169 } 170 171 for (final MessageReference node : dispatched) { 172 // Mark the dispatched messages as redelivered for next time. 173 Integer count = redeliveredMessages.get(node.getMessageId()); 174 if (count != null) { 175 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 176 } else { 177 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 178 } 179 if (keepDurableSubsActive&& pending.isTransient()) { 180 synchronized (pending) { 181 pending.addMessageFirst(node); 182 } 183 } else { 184 node.decrementReferenceCount(); 185 } 186 } 187 synchronized(dispatched) { 188 dispatched.clear(); 189 } 190 if (!keepDurableSubsActive && pending.isTransient()) { 191 synchronized (pending) { 192 try { 193 pending.reset(); 194 while (pending.hasNext()) { 195 MessageReference node = pending.next(); 196 node.decrementReferenceCount(); 197 pending.remove(); 198 } 199 } finally { 200 pending.release(); 201 } 202 } 203 } 204 prefetchExtension = 0; 205 } 206 207 208 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 209 MessageDispatch md = super.createMessageDispatch(node, message); 210 Integer count = redeliveredMessages.get(node.getMessageId()); 211 if (count != null) { 212 md.setRedeliveryCounter(count.intValue()); 213 } 214 return md; 215 } 216 217 public void add(MessageReference node) throws Exception { 218 if (!active.get() && !keepDurableSubsActive) { 219 return; 220 } 221 super.add(node); 222 } 223 224 protected void dispatchPending() throws IOException { 225 if (isActive()) { 226 super.dispatchPending(); 227 } 228 } 229 230 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 231 synchronized(pending) { 232 pending.addRecoveredMessage(message); 233 } 234 } 235 236 public int getPendingQueueSize() { 237 if (active.get() || keepDurableSubsActive) { 238 return super.getPendingQueueSize(); 239 } 240 // TODO: need to get from store 241 return 0; 242 } 243 244 public void setSelector(String selector) throws InvalidSelectorException { 245 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 246 } 247 248 protected boolean canDispatch(MessageReference node) { 249 return isActive(); 250 } 251 252 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { 253 node.getRegionDestination().acknowledge(context, this, ack, node); 254 redeliveredMessages.remove(node.getMessageId()); 255 node.decrementReferenceCount(); 256 } 257 258 259 public synchronized String toString() { 260 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" 261 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; 262 } 263 264 public SubscriptionKey getSubscriptionKey() { 265 return subscriptionKey; 266 } 267 268 /** 269 * Release any references that we are holding. 270 */ 271 public void destroy() { 272 synchronized (pending) { 273 try { 274 275 pending.reset(); 276 while (pending.hasNext()) { 277 MessageReference node = pending.next(); 278 node.decrementReferenceCount(); 279 } 280 281 } finally { 282 pending.release(); 283 pending.clear(); 284 } 285 } 286 synchronized(dispatched) { 287 for (Iterator iter = dispatched.iterator(); iter.hasNext();) { 288 MessageReference node = (MessageReference) iter.next(); 289 node.decrementReferenceCount(); 290 } 291 dispatched.clear(); 292 } 293 setSlowConsumer(false); 294 } 295 296 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 297 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 298 try { 299 dispatchPending(); 300 } catch (IOException e) { 301 LOG.warn("problem calling dispatchMatched", e); 302 } 303 } 304 } 305 306 protected boolean isDropped(MessageReference node) { 307 return false; 308 } 309 310 public boolean isKeepDurableSubsActive() { 311 return keepDurableSubsActive; 312 } 313 }