001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import javax.jms.JMSException;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.ConsumerBrokerExchange;
029    import org.apache.activemq.broker.DestinationAlreadyExistsException;
030    import org.apache.activemq.broker.ProducerBrokerExchange;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ConsumerControl;
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatchNotification;
038    import org.apache.activemq.command.MessagePull;
039    import org.apache.activemq.command.ProducerInfo;
040    import org.apache.activemq.command.RemoveSubscriptionInfo;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.filter.DestinationFilter;
043    import org.apache.activemq.filter.DestinationMap;
044    import org.apache.activemq.security.SecurityContext;
045    import org.apache.activemq.thread.TaskRunnerFactory;
046    import org.apache.activemq.usage.SystemUsage;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * 
052     */
053    public abstract class AbstractRegion implements Region {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
056    
057        protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
058        protected final DestinationMap destinationMap = new DestinationMap();
059        protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
060        protected final SystemUsage usageManager;
061        protected final DestinationFactory destinationFactory;
062        protected final DestinationStatistics destinationStatistics;
063        protected final RegionBroker broker;
064        protected boolean autoCreateDestinations = true;
065        protected final TaskRunnerFactory taskRunnerFactory;
066        protected final Object destinationsMutex = new Object();
067        protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
068        protected boolean started;
069    
070        public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
071                TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
072            if (broker == null) {
073                throw new IllegalArgumentException("null broker");
074            }
075            this.broker = broker;
076            this.destinationStatistics = destinationStatistics;
077            this.usageManager = memoryManager;
078            this.taskRunnerFactory = taskRunnerFactory;
079            if (broker == null) {
080                throw new IllegalArgumentException("null destinationFactory");
081            }
082            this.destinationFactory = destinationFactory;
083        }
084    
085        public final void start() throws Exception {
086            started = true;
087    
088            Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
089            for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
090                ActiveMQDestination dest = iter.next();
091    
092                ConnectionContext context = new ConnectionContext();
093                context.setBroker(broker.getBrokerService().getBroker());
094                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
095                context.getBroker().addDestination(context, dest, false);
096            }
097            synchronized (destinationsMutex) {
098                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
099                    Destination dest = i.next();
100                    dest.start();
101                }
102            }
103        }
104    
105        public void stop() throws Exception {
106            started = false;
107            synchronized (destinationsMutex) {
108                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
109                    Destination dest = i.next();
110                    dest.stop();
111                }
112            }
113            destinations.clear();
114        }
115    
116        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
117                boolean createIfTemporary) throws Exception {
118            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
119            synchronized (destinationsMutex) {
120                Destination dest = destinations.get(destination);
121                if (dest == null) {
122                    if (destination.isTemporary() == false || createIfTemporary) {
123                        dest = createDestination(context, destination);
124                        // intercept if there is a valid interceptor defined
125                        DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
126                        if (destinationInterceptor != null) {
127                            dest = destinationInterceptor.intercept(dest);
128                        }
129                        dest.start();
130                        destinations.put(destination, dest);
131                        destinationMap.put(destination, dest);
132                        addSubscriptionsForDestination(context, dest);
133                    }
134                    if (dest == null) {
135                        throw new JMSException("The destination " + destination + " does not exist.");
136                    }
137                }
138                return dest;
139            }
140        }
141    
142        public Map<ConsumerId, Subscription> getSubscriptions() {
143            return subscriptions;
144        }
145    
146        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
147                throws Exception {
148    
149            List<Subscription> rc = new ArrayList<Subscription>();
150            // Add all consumers that are interested in the destination.
151            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
152                Subscription sub = iter.next();
153                if (sub.matches(dest.getActiveMQDestination())) {
154                    dest.addSubscription(context, sub);
155                    rc.add(sub);
156                }
157            }
158            return rc;
159    
160        }
161    
162        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
163                throws Exception {
164    
165            // No timeout.. then try to shut down right way, fails if there are
166            // current subscribers.
167            if (timeout == 0) {
168                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
169                    Subscription sub = iter.next();
170                    if (sub.matches(destination)) {
171                        throw new JMSException("Destination still has an active subscription: " + destination);
172                    }
173                }
174            }
175    
176            if (timeout > 0) {
177                // TODO: implement a way to notify the subscribers that we want to
178                // take the down
179                // the destination and that they should un-subscribe.. Then wait up
180                // to timeout time before
181                // dropping the subscription.
182            }
183    
184            LOG.debug("Removing destination: " + destination);
185    
186            synchronized (destinationsMutex) {
187                Destination dest = destinations.remove(destination);
188                if (dest != null) {
189                    // timeout<0 or we timed out, we now force any remaining
190                    // subscriptions to un-subscribe.
191                    for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
192                        Subscription sub = iter.next();
193                        if (sub.matches(destination)) {
194                            dest.removeSubscription(context, sub, 0l);
195                        }
196                    }
197                    destinationMap.removeAll(destination);
198                    dispose(context, dest);
199                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
200                    if (destinationInterceptor != null) {
201                        destinationInterceptor.remove(dest);
202                    }
203    
204                } else {
205                    LOG.debug("Destination doesn't exist: " + dest);
206                }
207            }
208        }
209    
210        /**
211         * Provide an exact or wildcard lookup of destinations in the region
212         * 
213         * @return a set of matching destination objects.
214         */
215        public Set<Destination> getDestinations(ActiveMQDestination destination) {
216            synchronized (destinationsMutex) {
217                return destinationMap.get(destination);
218            }
219        }
220    
221        public Map<ActiveMQDestination, Destination> getDestinationMap() {
222            synchronized (destinationsMutex) {
223                return new HashMap<ActiveMQDestination, Destination>(destinations);
224            }
225        }
226    
227        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
228            LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
229                    + info.getDestination());
230            ActiveMQDestination destination = info.getDestination();
231            if (destination != null && !destination.isPattern() && !destination.isComposite()) {
232                // lets auto-create the destination
233                lookup(context, destination,true);
234            }
235    
236            Object addGuard;
237            synchronized (consumerChangeMutexMap) {
238                addGuard = consumerChangeMutexMap.get(info.getConsumerId());
239                if (addGuard == null) {
240                    addGuard = new Object();
241                    consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
242                }
243            }
244            synchronized (addGuard) {
245                Subscription o = subscriptions.get(info.getConsumerId());
246                if (o != null) {
247                    LOG
248                            .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
249                    return o;
250                }
251    
252                // We may need to add some destinations that are in persistent store
253                // but not active
254                // in the broker.
255                //
256                // TODO: think about this a little more. This is good cause
257                // destinations are not loaded into
258                // memory until a client needs to use the queue, but a management
259                // agent viewing the
260                // broker will not see a destination that exists in persistent
261                // store. We may want to
262                // eagerly load all destinations into the broker but have an
263                // inactive state for the
264                // destination which has reduced memory usage.
265                //
266                DestinationFilter.parseFilter(info.getDestination());
267    
268                Subscription sub = createSubscription(context, info);
269    
270                subscriptions.put(info.getConsumerId(), sub);
271    
272                // At this point we're done directly manipulating subscriptions,
273                // but we need to retain the synchronized block here. Consider
274                // otherwise what would happen if at this point a second
275                // thread added, then removed, as would be allowed with
276                // no mutex held. Remove is only essentially run once
277                // so everything after this point would be leaked.
278    
279                // Add the subscription to all the matching queues.
280                // But copy the matches first - to prevent deadlocks
281                List<Destination> addList = new ArrayList<Destination>();
282                synchronized (destinationsMutex) {
283                    for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
284                        Destination dest = (Destination) iter.next();
285                        addList.add(dest);
286                    }
287                }
288    
289                for (Destination dest : addList) {
290                    dest.addSubscription(context, sub);
291                }
292    
293                if (info.isBrowser()) {
294                    ((QueueBrowserSubscription) sub).destinationsAdded();
295                }
296    
297                return sub;
298            }
299        }
300    
301        /**
302         * Get all the Destinations that are in storage
303         * 
304         * @return Set of all stored destinations
305         */
306        public Set getDurableDestinations() {
307            return destinationFactory.getDestinations();
308        }
309    
310        /**
311         * @return all Destinations that don't have active consumers
312         */
313        protected Set<ActiveMQDestination> getInactiveDestinations() {
314            Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
315            synchronized (destinationsMutex) {
316                inactiveDests.removeAll(destinations.keySet());
317            }
318            return inactiveDests;
319        }
320    
321        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
322            LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
323                    + info.getDestination());
324    
325            Subscription sub = subscriptions.remove(info.getConsumerId());
326            // The sub could be removed elsewhere - see ConnectionSplitBroker
327            if (sub != null) {
328    
329                // remove the subscription from all the matching queues.
330                List<Destination> removeList = new ArrayList<Destination>();
331                synchronized (destinationsMutex) {
332                    for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
333                        Destination dest = (Destination) iter.next();
334                        removeList.add(dest);
335    
336                    }
337                }
338                for (Destination dest : removeList) {
339                    dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
340                }
341    
342                destroySubscription(sub);
343            }
344            synchronized (consumerChangeMutexMap) {
345                consumerChangeMutexMap.remove(info.getConsumerId());
346            }
347        }
348    
349        protected void destroySubscription(Subscription sub) {
350            sub.destroy();
351        }
352    
353        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
354            throw new JMSException("Invalid operation.");
355        }
356    
357        public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
358            final ConnectionContext context = producerExchange.getConnectionContext();
359    
360            if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
361                final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
362                producerExchange.setRegionDestination(regionDestination);
363            }
364    
365            producerExchange.getRegionDestination().send(producerExchange, messageSend);
366        }
367    
368        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
369            Subscription sub = consumerExchange.getSubscription();
370            if (sub == null) {
371                sub = subscriptions.get(ack.getConsumerId());
372                if (sub == null) {
373                    if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
374                        LOG.warn("Ack for non existent subscription, ack:" + ack);
375                        throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
376                    } else {
377                        LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
378                        return;
379                    }
380                }
381                consumerExchange.setSubscription(sub);
382            }
383            sub.acknowledge(consumerExchange.getConnectionContext(), ack);
384        }
385    
386        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
387            Subscription sub = subscriptions.get(pull.getConsumerId());
388            if (sub == null) {
389                throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
390            }
391            return sub.pullMessage(context, pull);
392        }
393    
394        protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
395            Destination dest = null;
396            synchronized (destinationsMutex) {
397                dest = destinations.get(destination);
398            }
399            if (dest == null) {
400                if (isAutoCreateDestinations()) {
401                    // Try to auto create the destination... re-invoke broker
402                    // from the
403                    // top so that the proper security checks are performed.
404                    try {
405                        context.getBroker().addDestination(context, destination, createTemporary);
406                        dest = addDestination(context, destination, false);
407                    } catch (DestinationAlreadyExistsException e) {
408                        // if the destination already exists then lets ignore
409                        // this error
410                    }
411                    // We should now have the dest created.
412                    synchronized (destinationsMutex) {
413                        dest = destinations.get(destination);
414                    }
415                }
416                if (dest == null) {
417                    throw new JMSException("The destination " + destination + " does not exist.");
418                }
419            }
420            return dest;
421        }
422    
423        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
424            Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
425            if (sub != null) {
426                sub.processMessageDispatchNotification(messageDispatchNotification);
427            } else {
428                throw new JMSException("Slave broker out of sync with master - Subscription: "
429                        + messageDispatchNotification.getConsumerId() + " on "
430                        + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
431                        + messageDispatchNotification.getMessageId());
432            }
433        }
434    
435        /*
436         * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
437         * dispatch is deferred till the notification to ensure that the
438         * subscription chosen by the master is used. AMQ-2102
439         */
440        protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
441                throws Exception {
442            Destination dest = null;
443            synchronized (destinationsMutex) {
444                dest = destinations.get(messageDispatchNotification.getDestination());
445            }
446            if (dest != null) {
447                dest.processDispatchNotification(messageDispatchNotification);
448            } else {
449                throw new JMSException("Slave broker out of sync with master - Destination: "
450                        + messageDispatchNotification.getDestination() + " does not exist for consumer "
451                        + messageDispatchNotification.getConsumerId() + " with message: "
452                        + messageDispatchNotification.getMessageId());
453            }
454        }
455    
456        public void gc() {
457            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
458                Subscription sub = iter.next();
459                sub.gc();
460            }
461            synchronized (destinationsMutex) {
462                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
463                    Destination dest = iter.next();
464                    dest.gc();
465                }
466            }
467        }
468    
469        protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
470    
471        protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
472                throws Exception {
473            return destinationFactory.createDestination(context, destination, destinationStatistics);
474        }
475    
476        public boolean isAutoCreateDestinations() {
477            return autoCreateDestinations;
478        }
479    
480        public void setAutoCreateDestinations(boolean autoCreateDestinations) {
481            this.autoCreateDestinations = autoCreateDestinations;
482        }
483    
484        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
485            synchronized (destinationsMutex) {
486                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
487                    Destination dest = (Destination) iter.next();
488                    dest.addProducer(context, info);
489                }
490            }
491        }
492    
493        /**
494         * Removes a Producer.
495         * 
496         * @param context
497         *            the environment the operation is being executed under.
498         * @throws Exception
499         *             TODO
500         */
501        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
502            synchronized (destinationsMutex) {
503                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
504                    Destination dest = (Destination) iter.next();
505                    dest.removeProducer(context, info);
506                }
507            }
508        }
509    
510        protected void dispose(ConnectionContext context, Destination dest) throws Exception {
511            dest.dispose(context);
512            dest.stop();
513            destinationFactory.removeDestination(dest);
514        }
515    
516        public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
517            Subscription sub = subscriptions.get(control.getConsumerId());
518            if (sub != null && sub instanceof AbstractSubscription) {
519                ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
520                if (LOG.isDebugEnabled()) {
521                    LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
522                            + control.getConsumerId());
523                }
524                try {
525                    lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
526                } catch (Exception e) {
527                    LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
528                }
529            }
530        }
531    }