001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    import javax.jms.Connection;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageConsumer;
025    import javax.jms.MessageListener;
026    import javax.jms.MessageProducer;
027    import javax.naming.NamingException;
028    import org.apache.activemq.Service;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A Destination bridge is used to bridge between to different JMS systems
034     * 
035     * 
036     */
037    public abstract class DestinationBridge implements Service, MessageListener {
038        private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039        protected MessageConsumer consumer;
040        protected AtomicBoolean started = new AtomicBoolean(false);
041        protected JmsMesageConvertor jmsMessageConvertor;
042        protected boolean doHandleReplyTo = true;
043        protected JmsConnector jmsConnector;
044        private int maximumRetries = 10;
045    
046        /**
047         * @return Returns the consumer.
048         */
049        public MessageConsumer getConsumer() {
050            return consumer;
051        }
052    
053        /**
054         * @param consumer The consumer to set.
055         */
056        public void setConsumer(MessageConsumer consumer) {
057            this.consumer = consumer;
058        }
059    
060        /**
061         * @param connector
062         */
063        public void setJmsConnector(JmsConnector connector) {
064            this.jmsConnector = connector;
065        }
066    
067        /**
068         * @return Returns the inboundMessageConvertor.
069         */
070        public JmsMesageConvertor getJmsMessageConvertor() {
071            return jmsMessageConvertor;
072        }
073    
074        /**
075         * @param jmsMessageConvertor
076         */
077        public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078            this.jmsMessageConvertor = jmsMessageConvertor;
079        }
080    
081        public int getMaximumRetries() {
082            return maximumRetries;
083        }
084    
085        /**
086         * Sets the maximum number of retries if a send fails before closing the
087         * bridge
088         */
089        public void setMaximumRetries(int maximumRetries) {
090            this.maximumRetries = maximumRetries;
091        }
092    
093        protected Destination processReplyToDestination(Destination destination) {
094            return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
095        }
096    
097        public void start() throws Exception {
098            if (started.compareAndSet(false, true)) {
099                MessageConsumer consumer = createConsumer();
100                consumer.setMessageListener(this);
101                createProducer();
102            }
103        }
104    
105        public void stop() throws Exception {
106            started.set(false);
107        }
108    
109        public void onMessage(Message message) {
110            int attempt = 0;
111            while (started.get() && message != null) {
112               
113                try {
114                    if (attempt > 0) {
115                        restartProducer();
116                    }
117                    Message converted;
118                    if (doHandleReplyTo) {
119                        Destination replyTo = message.getJMSReplyTo();
120                        if (replyTo != null) {
121                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
122                        } else {
123                            converted = jmsMessageConvertor.convert(message);
124                        }
125                    } else {
126                        message.setJMSReplyTo(null);
127                        converted = jmsMessageConvertor.convert(message);
128                    }
129                    sendMessage(converted);
130                    message.acknowledge();
131                    return;
132                } catch (Exception e) {
133                    LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
134                    if (maximumRetries > 0 && attempt >= maximumRetries) {
135                        try {
136                            stop();
137                        } catch (Exception e1) {
138                            LOG.warn("Failed to stop cleanly", e1);
139                        }
140                    }
141                }
142            }
143        }
144    
145        /**
146         * @return Returns the doHandleReplyTo.
147         */
148        protected boolean isDoHandleReplyTo() {
149            return doHandleReplyTo;
150        }
151    
152        /**
153         * @param doHandleReplyTo The doHandleReplyTo to set.
154         */
155        protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
156            this.doHandleReplyTo = doHandleReplyTo;
157        }
158    
159        protected abstract MessageConsumer createConsumer() throws JMSException;
160    
161        protected abstract MessageProducer createProducer() throws JMSException;
162    
163        protected abstract void sendMessage(Message message) throws JMSException;
164    
165        protected abstract Connection getConnnectionForConsumer();
166    
167        protected abstract Connection getConnectionForProducer();
168    
169        protected void restartProducer() throws JMSException, NamingException {
170            try {
171                //don't reconnect immediately
172                Thread.sleep(1000);
173                getConnectionForProducer().close();
174            } catch (Exception e) {
175                LOG.debug("Ignoring failure to close producer connection: " + e, e);
176            }
177            jmsConnector.restartProducerConnection();
178            createProducer();
179        }
180    }