001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Session;
023    import javax.jms.Topic;
024    import javax.jms.TopicConnection;
025    import javax.jms.TopicConnectionFactory;
026    import javax.jms.TopicSession;
027    import javax.naming.NamingException;
028    
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A Bridge to other JMS Topic providers
034     * 
035     * @org.apache.xbean.XBean
036     * 
037     * 
038     */
039    public class JmsTopicConnector extends JmsConnector {
040        private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
041        private String outboundTopicConnectionFactoryName;
042        private String localConnectionFactoryName;
043        private TopicConnectionFactory outboundTopicConnectionFactory;
044        private TopicConnectionFactory localTopicConnectionFactory;
045        private TopicConnection outboundTopicConnection;
046        private TopicConnection localTopicConnection;
047        private InboundTopicBridge[] inboundTopicBridges;
048        private OutboundTopicBridge[] outboundTopicBridges;
049    
050        public boolean init() {
051            boolean result = super.init();
052            if (result) {
053                try {
054                    initializeForeignTopicConnection();
055                    initializeLocalTopicConnection();
056                    initializeInboundJmsMessageConvertor();
057                    initializeOutboundJmsMessageConvertor();
058                    initializeInboundTopicBridges();
059                    initializeOutboundTopicBridges();
060                } catch (Exception e) {
061                    LOG.error("Failed to initialize the JMSConnector", e);
062                }
063            }
064            return result;
065        }
066    
067        /**
068         * @return Returns the inboundTopicBridges.
069         */
070        public InboundTopicBridge[] getInboundTopicBridges() {
071            return inboundTopicBridges;
072        }
073    
074        /**
075         * @param inboundTopicBridges The inboundTopicBridges to set.
076         */
077        public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
078            this.inboundTopicBridges = inboundTopicBridges;
079        }
080    
081        /**
082         * @return Returns the outboundTopicBridges.
083         */
084        public OutboundTopicBridge[] getOutboundTopicBridges() {
085            return outboundTopicBridges;
086        }
087    
088        /**
089         * @param outboundTopicBridges The outboundTopicBridges to set.
090         */
091        public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
092            this.outboundTopicBridges = outboundTopicBridges;
093        }
094    
095        /**
096         * @return Returns the localTopicConnectionFactory.
097         */
098        public TopicConnectionFactory getLocalTopicConnectionFactory() {
099            return localTopicConnectionFactory;
100        }
101    
102        /**
103         * @param localTopicConnectionFactory The localTopicConnectionFactory to
104         *                set.
105         */
106        public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
107            this.localTopicConnectionFactory = localConnectionFactory;
108        }
109    
110        /**
111         * @return Returns the outboundTopicConnectionFactory.
112         */
113        public TopicConnectionFactory getOutboundTopicConnectionFactory() {
114            return outboundTopicConnectionFactory;
115        }
116    
117        /**
118         * @return Returns the outboundTopicConnectionFactoryName.
119         */
120        public String getOutboundTopicConnectionFactoryName() {
121            return outboundTopicConnectionFactoryName;
122        }
123    
124        /**
125         * @param outboundTopicConnectionFactoryName The
126         *                outboundTopicConnectionFactoryName to set.
127         */
128        public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
129            this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
130        }
131    
132        /**
133         * @return Returns the localConnectionFactoryName.
134         */
135        public String getLocalConnectionFactoryName() {
136            return localConnectionFactoryName;
137        }
138    
139        /**
140         * @param localConnectionFactoryName The localConnectionFactoryName to set.
141         */
142        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143            this.localConnectionFactoryName = localConnectionFactoryName;
144        }
145    
146        /**
147         * @return Returns the localTopicConnection.
148         */
149        public TopicConnection getLocalTopicConnection() {
150            return localTopicConnection;
151        }
152    
153        /**
154         * @param localTopicConnection The localTopicConnection to set.
155         */
156        public void setLocalTopicConnection(TopicConnection localTopicConnection) {
157            this.localTopicConnection = localTopicConnection;
158        }
159    
160        /**
161         * @return Returns the outboundTopicConnection.
162         */
163        public TopicConnection getOutboundTopicConnection() {
164            return outboundTopicConnection;
165        }
166    
167        /**
168         * @param outboundTopicConnection The outboundTopicConnection to set.
169         */
170        public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
171            this.outboundTopicConnection = foreignTopicConnection;
172        }
173    
174        /**
175         * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory
176         *                to set.
177         */
178        public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
179            this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
180        }
181    
182        public void restartProducerConnection() throws NamingException, JMSException {
183            outboundTopicConnection = null;
184            initializeForeignTopicConnection();
185        }
186    
187        protected void initializeForeignTopicConnection() throws NamingException, JMSException {
188            if (outboundTopicConnection == null) {
189                // get the connection factories
190                if (outboundTopicConnectionFactory == null) {
191                    // look it up from JNDI
192                    if (outboundTopicConnectionFactoryName != null) {
193                        outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
194                            .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
195                        if (outboundUsername != null) {
196                            outboundTopicConnection = outboundTopicConnectionFactory
197                                .createTopicConnection(outboundUsername, outboundPassword);
198                        } else {
199                            outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
200                        }
201                    } else {
202                        throw new JMSException("Cannot create localConnection - no information");
203                    }
204                } else {
205                    if (outboundUsername != null) {
206                        outboundTopicConnection = outboundTopicConnectionFactory
207                            .createTopicConnection(outboundUsername, outboundPassword);
208                    } else {
209                        outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
210                    }
211                }
212            }
213            if (localClientId != null && localClientId.length() > 0) {
214                outboundTopicConnection.setClientID(getOutboundClientId());
215            }
216            outboundTopicConnection.start();
217        }
218    
219        protected void initializeLocalTopicConnection() throws NamingException, JMSException {
220            if (localTopicConnection == null) {
221                // get the connection factories
222                if (localTopicConnectionFactory == null) {
223                    if (embeddedConnectionFactory == null) {
224                        // look it up from JNDI
225                        if (localConnectionFactoryName != null) {
226                            localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
227                                .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228                            if (localUsername != null) {
229                                localTopicConnection = localTopicConnectionFactory
230                                    .createTopicConnection(localUsername, localPassword);
231                            } else {
232                                localTopicConnection = localTopicConnectionFactory.createTopicConnection();
233                            }
234                        } else {
235                            throw new JMSException("Cannot create localConnection - no information");
236                        }
237                    } else {
238                        localTopicConnection = embeddedConnectionFactory.createTopicConnection();
239                    }
240                } else {
241                    if (localUsername != null) {
242                        localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername,
243                                                                                                 localPassword);
244                    } else {
245                        localTopicConnection = localTopicConnectionFactory.createTopicConnection();
246                    }
247                }
248            }
249            if (localClientId != null && localClientId.length() > 0) {
250                localTopicConnection.setClientID(getLocalClientId());
251            }
252            localTopicConnection.start();
253        }
254    
255        protected void initializeInboundJmsMessageConvertor() {
256            inboundMessageConvertor.setConnection(localTopicConnection);
257        }
258    
259        protected void initializeOutboundJmsMessageConvertor() {
260            outboundMessageConvertor.setConnection(outboundTopicConnection);
261        }
262    
263        protected void initializeInboundTopicBridges() throws JMSException {
264            if (inboundTopicBridges != null) {
265                TopicSession outboundSession = outboundTopicConnection
266                    .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
267                TopicSession localSession = localTopicConnection.createTopicSession(false,
268                                                                                    Session.AUTO_ACKNOWLEDGE);
269                for (int i = 0; i < inboundTopicBridges.length; i++) {
270                    InboundTopicBridge bridge = inboundTopicBridges[i];
271                    String localTopicName = bridge.getLocalTopicName();
272                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
273                    String topicName = bridge.getInboundTopicName();
274                    Topic foreignTopic = createForeignTopic(outboundSession, topicName);
275                    bridge.setConsumerTopic(foreignTopic);
276                    bridge.setProducerTopic(activemqTopic);
277                    bridge.setProducerConnection(localTopicConnection);
278                    bridge.setConsumerConnection(outboundTopicConnection);
279                    if (bridge.getJmsMessageConvertor() == null) {
280                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
281                    }
282                    bridge.setJmsConnector(this);
283                    addInboundBridge(bridge);
284                }
285                outboundSession.close();
286                localSession.close();
287            }
288        }
289    
290        protected void initializeOutboundTopicBridges() throws JMSException {
291            if (outboundTopicBridges != null) {
292                TopicSession outboundSession = outboundTopicConnection
293                    .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
294                TopicSession localSession = localTopicConnection.createTopicSession(false,
295                                                                                    Session.AUTO_ACKNOWLEDGE);
296                for (int i = 0; i < outboundTopicBridges.length; i++) {
297                    OutboundTopicBridge bridge = outboundTopicBridges[i];
298                    String localTopicName = bridge.getLocalTopicName();
299                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
300                    String topicName = bridge.getOutboundTopicName();
301                    Topic foreignTopic = createForeignTopic(outboundSession, topicName);
302                    bridge.setConsumerTopic(activemqTopic);
303                    bridge.setProducerTopic(foreignTopic);
304                    bridge.setProducerConnection(outboundTopicConnection);
305                    bridge.setConsumerConnection(localTopicConnection);
306                    if (bridge.getJmsMessageConvertor() == null) {
307                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
308                    }
309                    bridge.setJmsConnector(this);
310                    addOutboundBridge(bridge);
311                }
312                outboundSession.close();
313                localSession.close();
314            }
315        }
316    
317        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
318                                                  Connection replyToConsumerConnection) {
319            Topic replyToProducerTopic = (Topic)destination;
320            boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
321    
322            if (isInbound) {
323                InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
324                if (bridge == null) {
325                    bridge = new InboundTopicBridge() {
326                        protected Destination processReplyToDestination(Destination destination) {
327                            return null;
328                        }
329                    };
330                    try {
331                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
332                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
333                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
334                        replyToConsumerSession.close();
335                        bridge.setConsumerTopic(replyToConsumerTopic);
336                        bridge.setProducerTopic(replyToProducerTopic);
337                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
338                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
339                        bridge.setDoHandleReplyTo(false);
340                        if (bridge.getJmsMessageConvertor() == null) {
341                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
342                        }
343                        bridge.setJmsConnector(this);
344                        bridge.start();
345                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
346                    } catch (Exception e) {
347                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
348                        return null;
349                    }
350                    replyToBridges.put(replyToProducerTopic, bridge);
351                }
352                return bridge.getConsumerTopic();
353            } else {
354                OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
355                if (bridge == null) {
356                    bridge = new OutboundTopicBridge() {
357                        protected Destination processReplyToDestination(Destination destination) {
358                            return null;
359                        }
360                    };
361                    try {
362                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
363                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
364                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
365                        replyToConsumerSession.close();
366                        bridge.setConsumerTopic(replyToConsumerTopic);
367                        bridge.setProducerTopic(replyToProducerTopic);
368                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
369                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
370                        bridge.setDoHandleReplyTo(false);
371                        if (bridge.getJmsMessageConvertor() == null) {
372                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
373                        }
374                        bridge.setJmsConnector(this);
375                        bridge.start();
376                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
377                    } catch (Exception e) {
378                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
379                        return null;
380                    }
381                    replyToBridges.put(replyToProducerTopic, bridge);
382                }
383                return bridge.getConsumerTopic();
384            }
385        }
386    
387        protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
388            return session.createTopic(topicName);
389        }
390    
391        protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
392            Topic result = null;
393            try {
394                result = session.createTopic(topicName);
395            } catch (JMSException e) {
396                // look-up the Topic
397                try {
398                    result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
399                } catch (NamingException e1) {
400                    String errStr = "Failed to look-up Topic for name: " + topicName;
401                    LOG.error(errStr, e);
402                    JMSException jmsEx = new JMSException(errStr);
403                    jmsEx.setLinkedException(e1);
404                    throw jmsEx;
405                }
406            }
407            return result;
408        }
409    
410    }