001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Set;
024    import java.util.concurrent.CancellationException;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.CopyOnWriteArraySet;
028    import java.util.concurrent.Future;
029    import org.apache.activemq.broker.BrokerService;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.broker.ProducerBrokerExchange;
032    import org.apache.activemq.broker.region.policy.DispatchPolicy;
033    import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034    import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035    import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ExceptionResponse;
038    import org.apache.activemq.command.Message;
039    import org.apache.activemq.command.MessageAck;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.command.ProducerAck;
042    import org.apache.activemq.command.ProducerInfo;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.command.SubscriptionInfo;
045    import org.apache.activemq.filter.MessageEvaluationContext;
046    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
047    import org.apache.activemq.store.MessageRecoveryListener;
048    import org.apache.activemq.store.TopicMessageStore;
049    import org.apache.activemq.thread.Task;
050    import org.apache.activemq.thread.TaskRunner;
051    import org.apache.activemq.thread.TaskRunnerFactory;
052    import org.apache.activemq.thread.Valve;
053    import org.apache.activemq.transaction.Synchronization;
054    import org.apache.activemq.util.SubscriptionKey;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    /**
059     * The Topic is a destination that sends a copy of a message to every active
060     * Subscription registered.
061     * 
062     * 
063     */
064    public class Topic extends BaseDestination implements Task {
065        protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
066        private final TopicMessageStore topicStore;
067        protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
068        protected final Valve dispatchValve = new Valve(true);
069        private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
070        private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
071        private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
072        private final TaskRunner taskRunner;
073        private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
074        private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
075            public void run() {
076                try {
077                    Topic.this.taskRunner.wakeup();
078                } catch (InterruptedException e) {
079                }
080            };
081        };
082    
083        public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
084                DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
085            super(brokerService, store, destination, parentStats);
086            this.topicStore = store;
087            // set default subscription recovery policy
088            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
089            this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
090        }
091    
092        @Override
093        public void initialize() throws Exception {
094            super.initialize();
095            if (store != null) {
096                // AMQ-2586: Better to leave this stat at zero than to give the user
097                // misleading metrics.
098                // int messageCount = store.getMessageCount();
099                // destinationStatistics.getMessages().setCount(messageCount);
100            }
101        }
102    
103        public List<Subscription> getConsumers() {
104            synchronized (consumers) {
105                return new ArrayList<Subscription>(consumers);
106            }
107        }
108    
109        public boolean lock(MessageReference node, LockOwner sub) {
110            return true;
111        }
112    
113        public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
114    
115           super.addSubscription(context, sub);
116    
117            if (!sub.getConsumerInfo().isDurable()) {
118    
119                // Do a retroactive recovery if needed.
120                if (sub.getConsumerInfo().isRetroactive()) {
121    
122                    // synchronize with dispatch method so that no new messages are
123                    // sent
124                    // while we are recovering a subscription to avoid out of order
125                    // messages.
126                    dispatchValve.turnOff();
127                    try {
128    
129                        synchronized (consumers) {
130                            sub.add(context, this);
131                            consumers.add(sub);
132                        }
133                        subscriptionRecoveryPolicy.recover(context, this, sub);
134    
135                    } finally {
136                        dispatchValve.turnOn();
137                    }
138    
139                } else {
140                    synchronized (consumers) {
141                        sub.add(context, this);
142                        consumers.add(sub);
143                    }
144                }
145            } else {
146                sub.add(context, this);
147                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
148                durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
149            }
150        }
151    
152        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
153                throws Exception {
154            if (!sub.getConsumerInfo().isDurable()) {
155                super.removeSubscription(context, sub, lastDeliveredSequenceId);
156                synchronized (consumers) {
157                    consumers.remove(sub);
158                }
159            }
160            sub.remove(context, this);
161        }
162    
163        public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
164            if (topicStore != null) {
165                topicStore.deleteSubscription(key.clientId, key.subscriptionName);
166                DurableTopicSubscription removed = durableSubcribers.remove(key);
167                if (removed != null) {
168                    destinationStatistics.getConsumers().decrement();
169                    // deactivate and remove
170                    removed.deactivate(false);
171                    consumers.remove(removed);
172                }
173            }
174        }
175    
176        public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
177            // synchronize with dispatch method so that no new messages are sent
178            // while
179            // we are recovering a subscription to avoid out of order messages.
180            dispatchValve.turnOff();
181            try {
182    
183                if (topicStore == null) {
184                    return;
185                }
186    
187                // Recover the durable subscription.
188                String clientId = subscription.getSubscriptionKey().getClientId();
189                String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
190                String selector = subscription.getConsumerInfo().getSelector();
191                SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
192                if (info != null) {
193                    // Check to see if selector changed.
194                    String s1 = info.getSelector();
195                    if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
196                        // Need to delete the subscription
197                        topicStore.deleteSubscription(clientId, subscriptionName);
198                        info = null;
199                    } else {
200                        synchronized (consumers) {
201                            consumers.add(subscription);
202                        }
203                    }
204                }
205                // Do we need to create the subscription?
206                if (info == null) {
207                    info = new SubscriptionInfo();
208                    info.setClientId(clientId);
209                    info.setSelector(selector);
210                    info.setSubscriptionName(subscriptionName);
211                    info.setDestination(getActiveMQDestination());
212                    // This destination is an actual destination id.
213                    info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
214                    // This destination might be a pattern
215                    synchronized (consumers) {
216                        consumers.add(subscription);
217                        topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
218                    }
219                }
220    
221                final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
222                msgContext.setDestination(destination);
223                if (subscription.isRecoveryRequired()) {
224                    topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
225                        public boolean recoverMessage(Message message) throws Exception {
226                            message.setRegionDestination(Topic.this);
227                            try {
228                                msgContext.setMessageReference(message);
229                                if (subscription.matches(message, msgContext)) {
230                                    subscription.add(message);
231                                }
232                            } catch (IOException e) {
233                                LOG.error("Failed to recover this message " + message);
234                            }
235                            return true;
236                        }
237    
238                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
239                            throw new RuntimeException("Should not be called.");
240                        }
241    
242                        public boolean hasSpace() {
243                            return true;
244                        }
245    
246                        public boolean isDuplicate(MessageId id) {
247                            return false;
248                        }
249                    });
250                }
251            } finally {
252                dispatchValve.turnOn();
253            }
254        }
255    
256        public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
257            synchronized (consumers) {
258                consumers.remove(sub);
259            }
260            sub.remove(context, this);
261        }
262    
263        protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
264            if (subscription.getConsumerInfo().isRetroactive()) {
265                subscriptionRecoveryPolicy.recover(context, this, subscription);
266            }
267        }
268    
269        public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
270            final ConnectionContext context = producerExchange.getConnectionContext();
271    
272            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
273            final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
274                    && !context.isInRecoveryMode();
275    
276            // There is delay between the client sending it and it arriving at the
277            // destination.. it may have expired.
278            if (message.isExpired()) {
279                broker.messageExpired(context, message, null);
280                getDestinationStatistics().getExpired().increment();
281                if (sendProducerAck) {
282                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
283                    context.getConnection().dispatchAsync(ack);
284                }
285                return;
286            }
287    
288            if (memoryUsage.isFull()) {
289                isFull(context, memoryUsage);
290                fastProducer(context, producerInfo);
291    
292                if (isProducerFlowControl() && context.isProducerFlowControl()) {
293    
294                    if (warnOnProducerFlowControl) {
295                        warnOnProducerFlowControl = false;
296                        LOG
297                                .info("Usage Manager memory limit ("
298                                        + memoryUsage.getLimit()
299                                        + ") reached for "
300                                        + getActiveMQDestination().getQualifiedName()
301                                        + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
302                                        + " See http://activemq.apache.org/producer-flow-control.html for more info");
303                    }
304    
305                    if (systemUsage.isSendFailIfNoSpace()) {
306                        throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
307                                + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
308                                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
309                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
310                    }
311    
312                    // We can avoid blocking due to low usage if the producer is
313                    // sending
314                    // a sync message or
315                    // if it is using a producer window
316                    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
317                        synchronized (messagesWaitingForSpace) {
318                            messagesWaitingForSpace.add(new Runnable() {
319                                public void run() {
320                                    try {
321    
322                                        // While waiting for space to free up... the
323                                        // message may have expired.
324                                        if (message.isExpired()) {
325                                            broker.messageExpired(context, message, null);
326                                            getDestinationStatistics().getExpired().increment();
327                                        } else {
328                                            doMessageSend(producerExchange, message);
329                                        }
330    
331                                        if (sendProducerAck) {
332                                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
333                                                    .getSize());
334                                            context.getConnection().dispatchAsync(ack);
335                                        } else {
336                                            Response response = new Response();
337                                            response.setCorrelationId(message.getCommandId());
338                                            context.getConnection().dispatchAsync(response);
339                                        }
340    
341                                    } catch (Exception e) {
342                                        if (!sendProducerAck && !context.isInRecoveryMode()) {
343                                            ExceptionResponse response = new ExceptionResponse(e);
344                                            response.setCorrelationId(message.getCommandId());
345                                            context.getConnection().dispatchAsync(response);
346                                        }
347                                    }
348    
349                                }
350                            });
351    
352                            registerCallbackForNotFullNotification();
353                            context.setDontSendReponse(true);
354                            return;
355                        }
356    
357                    } else {
358                        // Producer flow control cannot be used, so we have do the
359                        // flow
360                        // control at the broker
361                        // by blocking this thread until there is space available.
362    
363                        if (memoryUsage.isFull()) {
364                            if (context.isInTransaction()) {
365    
366                                int count = 0;
367                                while (!memoryUsage.waitForSpace(1000)) {
368                                    if (context.getStopping().get()) {
369                                        throw new IOException("Connection closed, send aborted.");
370                                    }
371                                    if (count > 2 && context.isInTransaction()) {
372                                        count = 0;
373                                        int size = context.getTransaction().size();
374                                        LOG.warn("Waiting for space to send  transacted message - transaction elements = "
375                                                + size + " need more space to commit. Message = " + message);
376                                    }
377                                }
378                            } else {
379                                waitForSpace(
380                                        context,
381                                        memoryUsage,
382                                        "Usage Manager memory limit reached. Stopping producer ("
383                                                + message.getProducerId()
384                                                + ") to prevent flooding "
385                                                + getActiveMQDestination().getQualifiedName()
386                                                + "."
387                                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
388                            }
389                        }
390    
391                        // The usage manager could have delayed us by the time
392                        // we unblock the message could have expired..
393                        if (message.isExpired()) {
394                            getDestinationStatistics().getExpired().increment();
395                            if (LOG.isDebugEnabled()) {
396                                LOG.debug("Expired message: " + message);
397                            }
398                            return;
399                        }
400                    }
401                }
402            }
403    
404            doMessageSend(producerExchange, message);
405            messageDelivered(context, message);
406            if (sendProducerAck) {
407                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
408                context.getConnection().dispatchAsync(ack);
409            }
410        }
411    
412        /**
413         * do send the message - this needs to be synchronized to ensure messages
414         * are stored AND dispatched in the right order
415         * 
416         * @param producerExchange
417         * @param message
418         * @throws IOException
419         * @throws Exception
420         */
421        synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
422                throws IOException, Exception {
423            final ConnectionContext context = producerExchange.getConnectionContext();
424            message.setRegionDestination(this);
425            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
426            Future<Object> result = null;
427    
428            if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
429                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
430                    final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
431                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
432                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
433                            + " See http://activemq.apache.org/producer-flow-control.html for more info";
434                    if (systemUsage.isSendFailIfNoSpace()) {
435                        throw new javax.jms.ResourceAllocationException(logMessage);
436                    }
437    
438                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
439                }
440                result = topicStore.asyncAddTopicMessage(context, message);
441            }
442    
443            message.incrementReferenceCount();
444    
445            if (context.isInTransaction()) {
446                context.getTransaction().addSynchronization(new Synchronization() {
447                    @Override
448                    public void afterCommit() throws Exception {
449                        // It could take while before we receive the commit
450                        // operration.. by that time the message could have
451                        // expired..
452                        if (broker.isExpired(message)) {
453                            getDestinationStatistics().getExpired().increment();
454                            broker.messageExpired(context, message, null);
455                            message.decrementReferenceCount();
456                            return;
457                        }
458                        try {
459                            dispatch(context, message);
460                        } finally {
461                            message.decrementReferenceCount();
462                        }
463                    }
464                });
465    
466            } else {
467                try {
468                    dispatch(context, message);
469                } finally {
470                    message.decrementReferenceCount();
471                }
472            }
473            if (result != null && !result.isCancelled()) {
474                try {
475                    result.get();
476                } catch (CancellationException e) {
477                    // ignore - the task has been cancelled if the message
478                    // has already been deleted
479                }
480            }
481    
482        }
483    
484        private boolean canOptimizeOutPersistence() {
485            return durableSubcribers.size() == 0;
486        }
487    
488        @Override
489        public String toString() {
490            return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
491        }
492    
493        public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
494                final MessageReference node) throws IOException {
495            if (topicStore != null && node.isPersistent()) {
496                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
497                SubscriptionKey key = dsub.getSubscriptionKey();
498                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
499            }
500            messageConsumed(context, node);
501        }
502    
503        public void gc() {
504        }
505    
506        public Message loadMessage(MessageId messageId) throws IOException {
507            return topicStore != null ? topicStore.getMessage(messageId) : null;
508        }
509    
510        public void start() throws Exception {
511            this.subscriptionRecoveryPolicy.start();
512            if (memoryUsage != null) {
513                memoryUsage.start();
514            }
515    
516        }
517    
518        public void stop() throws Exception {
519            if (taskRunner != null) {
520                taskRunner.shutdown();
521            }
522            this.subscriptionRecoveryPolicy.stop();
523            if (memoryUsage != null) {
524                memoryUsage.stop();
525            }
526            if (this.topicStore != null) {
527                this.topicStore.stop();
528            }
529        }
530    
531        public Message[] browse() {
532            final Set<Message> result = new CopyOnWriteArraySet<Message>();
533            try {
534                if (topicStore != null) {
535                    topicStore.recover(new MessageRecoveryListener() {
536                        public boolean recoverMessage(Message message) throws Exception {
537                            result.add(message);
538                            return true;
539                        }
540    
541                        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
542                            return true;
543                        }
544    
545                        public boolean hasSpace() {
546                            return true;
547                        }
548    
549                        public boolean isDuplicate(MessageId id) {
550                            return false;
551                        }
552                    });
553                    Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
554                    if (msgs != null) {
555                        for (int i = 0; i < msgs.length; i++) {
556                            result.add(msgs[i]);
557                        }
558                    }
559                }
560            } catch (Throwable e) {
561                LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
562            }
563            return result.toArray(new Message[result.size()]);
564        }
565    
566        public boolean iterate() {
567            synchronized (messagesWaitingForSpace) {
568                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
569                    Runnable op = messagesWaitingForSpace.removeFirst();
570                    op.run();
571                }
572    
573                if (!messagesWaitingForSpace.isEmpty()) {
574                    registerCallbackForNotFullNotification();
575                }
576            }
577            return false;
578        }
579    
580        private void registerCallbackForNotFullNotification() {
581            // If the usage manager is not full, then the task will not
582            // get called..
583            if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
584                // so call it directly here.
585                sendMessagesWaitingForSpaceTask.run();
586            }
587        }
588    
589        // Properties
590        // -------------------------------------------------------------------------
591    
592        public DispatchPolicy getDispatchPolicy() {
593            return dispatchPolicy;
594        }
595    
596        public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
597            this.dispatchPolicy = dispatchPolicy;
598        }
599    
600        public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
601            return subscriptionRecoveryPolicy;
602        }
603    
604        public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
605            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
606        }
607    
608        // Implementation methods
609        // -------------------------------------------------------------------------
610    
611        public final void wakeup() {
612        }
613    
614        protected void dispatch(final ConnectionContext context, Message message) throws Exception {
615            // AMQ-2586: Better to leave this stat at zero than to give the user
616            // misleading metrics.
617            // destinationStatistics.getMessages().increment();
618            destinationStatistics.getEnqueues().increment();
619            dispatchValve.increment();
620            MessageEvaluationContext msgContext = null;
621            try {
622                if (!subscriptionRecoveryPolicy.add(context, message)) {
623                    return;
624                }
625                synchronized (consumers) {
626                    if (consumers.isEmpty()) {
627                        onMessageWithNoConsumers(context, message);
628                        return;
629                    }
630                }
631                msgContext = context.getMessageEvaluationContext();
632                msgContext.setDestination(destination);
633                msgContext.setMessageReference(message);
634                if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
635                    onMessageWithNoConsumers(context, message);
636                }
637    
638            } finally {
639                dispatchValve.decrement();
640                if (msgContext != null) {
641                    msgContext.clear();
642                }
643            }
644        }
645    
646        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
647            broker.messageExpired(context, reference, subs);
648            // AMQ-2586: Better to leave this stat at zero than to give the user
649            // misleading metrics.
650            // destinationStatistics.getMessages().decrement();
651            destinationStatistics.getEnqueues().decrement();
652            destinationStatistics.getExpired().increment();
653            MessageAck ack = new MessageAck();
654            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
655            ack.setDestination(destination);
656            ack.setMessageID(reference.getMessageId());
657            try {
658                acknowledge(context, subs, ack, reference);
659            } catch (IOException e) {
660                LOG.error("Failed to remove expired Message from the store ", e);
661            }
662        }
663    
664        @Override
665        protected Logger getLog() {
666            return LOG;
667        }
668    
669    }