001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Queue;
023    import javax.jms.QueueConnection;
024    import javax.jms.QueueConnectionFactory;
025    import javax.jms.QueueSession;
026    import javax.jms.Session;
027    import javax.naming.NamingException;
028    
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A Bridge to other JMS Queue providers
034     * 
035     * @org.apache.xbean.XBean
036     * 
037     * 
038     */
039    public class JmsQueueConnector extends JmsConnector {
040        private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
041        private String outboundQueueConnectionFactoryName;
042        private String localConnectionFactoryName;
043        private QueueConnectionFactory outboundQueueConnectionFactory;
044        private QueueConnectionFactory localQueueConnectionFactory;
045        private QueueConnection outboundQueueConnection;
046        private QueueConnection localQueueConnection;
047        private InboundQueueBridge[] inboundQueueBridges;
048        private OutboundQueueBridge[] outboundQueueBridges;
049    
050        public boolean init() {
051            boolean result = super.init();
052            if (result) {
053                try {
054                    initializeForeignQueueConnection();
055                    initializeLocalQueueConnection();
056                    initializeInboundJmsMessageConvertor();
057                    initializeOutboundJmsMessageConvertor();
058                    initializeInboundQueueBridges();
059                    initializeOutboundQueueBridges();
060                } catch (Exception e) {
061                    LOG.error("Failed to initialize the JMSConnector", e);
062                }
063            }
064            return result;
065        }
066    
067        /**
068         * @return Returns the inboundQueueBridges.
069         */
070        public InboundQueueBridge[] getInboundQueueBridges() {
071            return inboundQueueBridges;
072        }
073    
074        /**
075         * @param inboundQueueBridges The inboundQueueBridges to set.
076         */
077        public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
078            this.inboundQueueBridges = inboundQueueBridges;
079        }
080    
081        /**
082         * @return Returns the outboundQueueBridges.
083         */
084        public OutboundQueueBridge[] getOutboundQueueBridges() {
085            return outboundQueueBridges;
086        }
087    
088        /**
089         * @param outboundQueueBridges The outboundQueueBridges to set.
090         */
091        public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
092            this.outboundQueueBridges = outboundQueueBridges;
093        }
094    
095        /**
096         * @return Returns the localQueueConnectionFactory.
097         */
098        public QueueConnectionFactory getLocalQueueConnectionFactory() {
099            return localQueueConnectionFactory;
100        }
101    
102        /**
103         * @param localQueueConnectionFactory The localQueueConnectionFactory to
104         *                set.
105         */
106        public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
107            this.localQueueConnectionFactory = localConnectionFactory;
108        }
109    
110        /**
111         * @return Returns the outboundQueueConnectionFactory.
112         */
113        public QueueConnectionFactory getOutboundQueueConnectionFactory() {
114            return outboundQueueConnectionFactory;
115        }
116    
117        /**
118         * @return Returns the outboundQueueConnectionFactoryName.
119         */
120        public String getOutboundQueueConnectionFactoryName() {
121            return outboundQueueConnectionFactoryName;
122        }
123    
124        /**
125         * @param outboundQueueConnectionFactoryName The
126         *                outboundQueueConnectionFactoryName to set.
127         */
128        public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
129            this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
130        }
131    
132        /**
133         * @return Returns the localConnectionFactoryName.
134         */
135        public String getLocalConnectionFactoryName() {
136            return localConnectionFactoryName;
137        }
138    
139        /**
140         * @param localConnectionFactoryName The localConnectionFactoryName to set.
141         */
142        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143            this.localConnectionFactoryName = localConnectionFactoryName;
144        }
145    
146        /**
147         * @return Returns the localQueueConnection.
148         */
149        public QueueConnection getLocalQueueConnection() {
150            return localQueueConnection;
151        }
152    
153        /**
154         * @param localQueueConnection The localQueueConnection to set.
155         */
156        public void setLocalQueueConnection(QueueConnection localQueueConnection) {
157            this.localQueueConnection = localQueueConnection;
158        }
159    
160        /**
161         * @return Returns the outboundQueueConnection.
162         */
163        public QueueConnection getOutboundQueueConnection() {
164            return outboundQueueConnection;
165        }
166    
167        /**
168         * @param outboundQueueConnection The outboundQueueConnection to set.
169         */
170        public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
171            this.outboundQueueConnection = foreignQueueConnection;
172        }
173    
174        /**
175         * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
176         *                to set.
177         */
178        public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
179            this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
180        }
181    
182        public void restartProducerConnection() throws NamingException, JMSException {
183            outboundQueueConnection = null;
184            initializeForeignQueueConnection();
185    
186            // the outboundQueueConnection was reestablished - publish the new connection to the bridges
187            if (inboundQueueBridges != null) {
188                    for (int i = 0; i < inboundQueueBridges.length; i++) {
189                            InboundQueueBridge bridge = inboundQueueBridges[i];
190                            bridge.setConsumerConnection(outboundQueueConnection);
191                    }
192            }
193            if (outboundQueueBridges != null) {
194                    for (int i = 0; i < outboundQueueBridges.length; i++) {
195                            OutboundQueueBridge bridge = outboundQueueBridges[i];
196                            bridge.setProducerConnection(outboundQueueConnection);
197                    }
198            }
199        }
200    
201        protected void initializeForeignQueueConnection() throws NamingException, JMSException {
202            if (outboundQueueConnection == null) {
203                // get the connection factories
204                if (outboundQueueConnectionFactory == null) {
205                    // look it up from JNDI
206                    if (outboundQueueConnectionFactoryName != null) {
207                        outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
208                            .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
209                        if (outboundUsername != null) {
210                            outboundQueueConnection = outboundQueueConnectionFactory
211                                .createQueueConnection(outboundUsername, outboundPassword);
212                        } else {
213                            outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
214                        }
215                    } else {
216                        throw new JMSException("Cannot create foreignConnection - no information");
217                    }
218                } else {
219                    if (outboundUsername != null) {
220                        outboundQueueConnection = outboundQueueConnectionFactory
221                            .createQueueConnection(outboundUsername, outboundPassword);
222                    } else {
223                        outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
224                    }
225                }
226            }
227            if (localClientId != null && localClientId.length() > 0) {
228                outboundQueueConnection.setClientID(getOutboundClientId());
229            }
230            outboundQueueConnection.start();
231        }
232    
233        protected void initializeLocalQueueConnection() throws NamingException, JMSException {
234            if (localQueueConnection == null) {
235                // get the connection factories
236                if (localQueueConnectionFactory == null) {
237                    if (embeddedConnectionFactory == null) {
238                        // look it up from JNDI
239                        if (localConnectionFactoryName != null) {
240                            localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
241                                .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
242                            if (localUsername != null) {
243                                localQueueConnection = localQueueConnectionFactory
244                                    .createQueueConnection(localUsername, localPassword);
245                            } else {
246                                localQueueConnection = localQueueConnectionFactory.createQueueConnection();
247                            }
248                        } else {
249                            throw new JMSException("Cannot create localConnection - no information");
250                        }
251                    } else {
252                        localQueueConnection = embeddedConnectionFactory.createQueueConnection();
253                    }
254                } else {
255                    if (localUsername != null) {
256                        localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
257                                                                                                 localPassword);
258                    } else {
259                        localQueueConnection = localQueueConnectionFactory.createQueueConnection();
260                    }
261                }
262            }
263            if (localClientId != null && localClientId.length() > 0) {
264                localQueueConnection.setClientID(getLocalClientId());
265            }
266            localQueueConnection.start();
267        }
268    
269        protected void initializeInboundJmsMessageConvertor() {
270            inboundMessageConvertor.setConnection(localQueueConnection);
271        }
272    
273        protected void initializeOutboundJmsMessageConvertor() {
274            outboundMessageConvertor.setConnection(outboundQueueConnection);
275        }
276    
277        protected void initializeInboundQueueBridges() throws JMSException {
278            if (inboundQueueBridges != null) {
279                QueueSession outboundSession = outboundQueueConnection
280                    .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
281                QueueSession localSession = localQueueConnection.createQueueSession(false,
282                                                                                    Session.AUTO_ACKNOWLEDGE);
283                for (int i = 0; i < inboundQueueBridges.length; i++) {
284                    InboundQueueBridge bridge = inboundQueueBridges[i];
285                    String localQueueName = bridge.getLocalQueueName();
286                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
287                    String queueName = bridge.getInboundQueueName();
288                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
289                    bridge.setConsumerQueue(foreignQueue);
290                    bridge.setProducerQueue(activemqQueue);
291                    bridge.setProducerConnection(localQueueConnection);
292                    bridge.setConsumerConnection(outboundQueueConnection);
293                    if (bridge.getJmsMessageConvertor() == null) {
294                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
295                    }
296                    bridge.setJmsConnector(this);
297                    addInboundBridge(bridge);
298                }
299                outboundSession.close();
300                localSession.close();
301            }
302        }
303    
304        protected void initializeOutboundQueueBridges() throws JMSException {
305            if (outboundQueueBridges != null) {
306                QueueSession outboundSession = outboundQueueConnection
307                    .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
308                QueueSession localSession = localQueueConnection.createQueueSession(false,
309                                                                                    Session.AUTO_ACKNOWLEDGE);
310                for (int i = 0; i < outboundQueueBridges.length; i++) {
311                    OutboundQueueBridge bridge = outboundQueueBridges[i];
312                    String localQueueName = bridge.getLocalQueueName();
313                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
314                    String queueName = bridge.getOutboundQueueName();
315                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
316                    bridge.setConsumerQueue(activemqQueue);
317                    bridge.setProducerQueue(foreignQueue);
318                    bridge.setProducerConnection(outboundQueueConnection);
319                    bridge.setConsumerConnection(localQueueConnection);
320                    if (bridge.getJmsMessageConvertor() == null) {
321                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
322                    }
323                    bridge.setJmsConnector(this);
324                    addOutboundBridge(bridge);
325                }
326                outboundSession.close();
327                localSession.close();
328            }
329        }
330    
331        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
332                                                  Connection replyToConsumerConnection) {
333            Queue replyToProducerQueue = (Queue)destination;
334            boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
335    
336            if (isInbound) {
337                InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
338                if (bridge == null) {
339                    bridge = new InboundQueueBridge() {
340                        protected Destination processReplyToDestination(Destination destination) {
341                            return null;
342                        }
343                    };
344                    try {
345                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
346                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
347                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
348                        replyToConsumerSession.close();
349                        bridge.setConsumerQueue(replyToConsumerQueue);
350                        bridge.setProducerQueue(replyToProducerQueue);
351                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
352                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
353                        bridge.setDoHandleReplyTo(false);
354                        if (bridge.getJmsMessageConvertor() == null) {
355                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
356                        }
357                        bridge.setJmsConnector(this);
358                        bridge.start();
359                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
360                    } catch (Exception e) {
361                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
362                        return null;
363                    }
364                    replyToBridges.put(replyToProducerQueue, bridge);
365                }
366                return bridge.getConsumerQueue();
367            } else {
368                OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
369                if (bridge == null) {
370                    bridge = new OutboundQueueBridge() {
371                        protected Destination processReplyToDestination(Destination destination) {
372                            return null;
373                        }
374                    };
375                    try {
376                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
377                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
378                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
379                        replyToConsumerSession.close();
380                        bridge.setConsumerQueue(replyToConsumerQueue);
381                        bridge.setProducerQueue(replyToProducerQueue);
382                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
383                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
384                        bridge.setDoHandleReplyTo(false);
385                        if (bridge.getJmsMessageConvertor() == null) {
386                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
387                        }
388                        bridge.setJmsConnector(this);
389                        bridge.start();
390                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
391                    } catch (Exception e) {
392                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
393                        return null;
394                    }
395                    replyToBridges.put(replyToProducerQueue, bridge);
396                }
397                return bridge.getConsumerQueue();
398            }
399        }
400    
401        protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
402            return session.createQueue(queueName);
403        }
404    
405        protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
406            Queue result = null;
407            try {
408                result = session.createQueue(queueName);
409            } catch (JMSException e) {
410                // look-up the Queue
411                try {
412                    result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
413                } catch (NamingException e1) {
414                    String errStr = "Failed to look-up Queue for name: " + queueName;
415                    LOG.error(errStr, e);
416                    JMSException jmsEx = new JMSException(errStr);
417                    jmsEx.setLinkedException(e1);
418                    throw jmsEx;
419                }
420            }
421            return result;
422        }
423    
424    }