001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import org.apache.activemq.broker.region.DurableTopicSubscription;
020    import org.apache.activemq.broker.region.Subscription;
021    import org.apache.activemq.command.ActiveMQDestination;
022    import org.apache.activemq.command.ActiveMQQueue;
023    import org.apache.activemq.command.ActiveMQTopic;
024    import org.apache.activemq.command.Message;
025    
026    /**
027     * A {@link DeadLetterStrategy} where each destination has its own individual
028     * DLQ using the subject naming hierarchy.
029     * 
030     * @org.apache.xbean.XBean
031     * 
032     */
033    public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034    
035        private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036        private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037        private boolean useQueueForQueueMessages = true;
038        private boolean useQueueForTopicMessages = true;
039        private boolean destinationPerDurableSubscriber;
040    
041        public ActiveMQDestination getDeadLetterQueueFor(Message message,
042                                                         Subscription subscription) {
043            if (message.getDestination().isQueue()) {
044                return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
045            } else {
046                return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
047            }
048        }
049    
050        // Properties
051        // -------------------------------------------------------------------------
052    
053        public String getQueuePrefix() {
054            return queuePrefix;
055        }
056    
057        /**
058         * Sets the prefix to use for all dead letter queues for queue messages
059         */
060        public void setQueuePrefix(String queuePrefix) {
061            this.queuePrefix = queuePrefix;
062        }
063    
064        public String getTopicPrefix() {
065            return topicPrefix;
066        }
067    
068        /**
069         * Sets the prefix to use for all dead letter queues for topic messages
070         */
071        public void setTopicPrefix(String topicPrefix) {
072            this.topicPrefix = topicPrefix;
073        }
074    
075        public boolean isUseQueueForQueueMessages() {
076            return useQueueForQueueMessages;
077        }
078    
079        /**
080         * Sets whether a queue or topic should be used for queue messages sent to a
081         * DLQ. The default is to use a Queue
082         */
083        public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
084            this.useQueueForQueueMessages = useQueueForQueueMessages;
085        }
086    
087        public boolean isUseQueueForTopicMessages() {
088            return useQueueForTopicMessages;
089        }
090    
091        /**
092         * Sets whether a queue or topic should be used for topic messages sent to a
093         * DLQ. The default is to use a Queue
094         */
095        public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
096            this.useQueueForTopicMessages = useQueueForTopicMessages;
097        }
098    
099        public boolean isDestinationPerDurableSubscriber() {
100            return destinationPerDurableSubscriber;
101        }
102    
103        /**
104         * sets whether durable topic subscriptions are to get individual dead letter destinations.
105         * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
106         * The default is false.
107         * @param destinationPerDurableSubscriber
108         */
109        public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
110            this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
111        }
112    
113        // Implementation methods
114        // -------------------------------------------------------------------------
115        protected ActiveMQDestination createDestination(Message message,
116                                                        String prefix,
117                                                        boolean useQueue,
118                                                        Subscription subscription ) {
119            String name = prefix + message.getDestination().getPhysicalName();
120            if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
121                name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
122            }
123            if (useQueue) {
124                return new ActiveMQQueue(name);
125            } else {
126                return new ActiveMQTopic(name);
127            }
128        }
129    
130    }