001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.io.Serializable;
020    import java.util.Iterator;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import javax.jms.BytesMessage;
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    import javax.jms.MapMessage;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    import javax.jms.MessageProducer;
031    import javax.jms.ObjectMessage;
032    import javax.jms.Queue;
033    import javax.jms.QueueBrowser;
034    import javax.jms.QueueReceiver;
035    import javax.jms.QueueSender;
036    import javax.jms.QueueSession;
037    import javax.jms.StreamMessage;
038    import javax.jms.TemporaryQueue;
039    import javax.jms.TemporaryTopic;
040    import javax.jms.TextMessage;
041    import javax.jms.Topic;
042    import javax.jms.TopicPublisher;
043    import javax.jms.TopicSession;
044    import javax.jms.TopicSubscriber;
045    import javax.jms.XASession;
046    import javax.jms.Session;
047    import javax.transaction.xa.XAResource;
048    
049    import org.apache.activemq.ActiveMQMessageProducer;
050    import org.apache.activemq.ActiveMQQueueSender;
051    import org.apache.activemq.ActiveMQSession;
052    import org.apache.activemq.ActiveMQTopicPublisher;
053    import org.apache.activemq.AlreadyClosedException;
054    import org.slf4j.Logger;
055    import org.slf4j.LoggerFactory;
056    
057    /**
058     * 
059     */
060    public class PooledSession implements Session, TopicSession, QueueSession, XASession {
061        private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
062    
063        private ActiveMQSession session;
064        private SessionPool sessionPool;
065        private ActiveMQMessageProducer messageProducer;
066        private ActiveMQQueueSender queueSender;
067        private ActiveMQTopicPublisher topicPublisher;
068        private boolean transactional = true;
069        private boolean ignoreClose;
070    
071        private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
072        private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
073    
074        public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
075            this.session = aSession;
076            this.sessionPool = sessionPool;
077            this.transactional = session.isTransacted();
078        }
079    
080        protected boolean isIgnoreClose() {
081            return ignoreClose;
082        }
083    
084        protected void setIgnoreClose(boolean ignoreClose) {
085            this.ignoreClose = ignoreClose;
086        }
087    
088        public void close() throws JMSException {
089            if (!ignoreClose) {
090                // TODO a cleaner way to reset??
091    
092                // lets reset the session
093                getInternalSession().setMessageListener(null);
094    
095                // Close any consumers and browsers that may have been created.
096                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
097                    MessageConsumer consumer = iter.next();
098                    consumer.close();
099                }
100                consumers.clear();
101    
102                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
103                    QueueBrowser browser = iter.next();
104                    browser.close();
105                }
106                browsers.clear();
107    
108                // maybe do a rollback?
109                if (transactional) {
110                    try {
111                        getInternalSession().rollback();
112                    } catch (JMSException e) {
113                        LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
114    
115                        // lets close the session and not put the session back into
116                        // the pool
117                        try {
118                            session.close();
119                        } catch (JMSException e1) {
120                            LOG.trace("Ignoring exception as discarding session: " + e1, e1);
121                        }
122                        session = null;
123                        sessionPool.invalidateSession(this);
124                        return;
125                    }
126                }
127    
128                sessionPool.returnSession(this);
129            }
130        }
131    
132        public void commit() throws JMSException {
133            getInternalSession().commit();
134        }
135    
136        public BytesMessage createBytesMessage() throws JMSException {
137            return getInternalSession().createBytesMessage();
138        }
139    
140        public MapMessage createMapMessage() throws JMSException {
141            return getInternalSession().createMapMessage();
142        }
143    
144        public Message createMessage() throws JMSException {
145            return getInternalSession().createMessage();
146        }
147    
148        public ObjectMessage createObjectMessage() throws JMSException {
149            return getInternalSession().createObjectMessage();
150        }
151    
152        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
153            return getInternalSession().createObjectMessage(serializable);
154        }
155    
156        public Queue createQueue(String s) throws JMSException {
157            return getInternalSession().createQueue(s);
158        }
159    
160        public StreamMessage createStreamMessage() throws JMSException {
161            return getInternalSession().createStreamMessage();
162        }
163    
164        public TemporaryQueue createTemporaryQueue() throws JMSException {
165            return getInternalSession().createTemporaryQueue();
166        }
167    
168        public TemporaryTopic createTemporaryTopic() throws JMSException {
169            return getInternalSession().createTemporaryTopic();
170        }
171    
172        public void unsubscribe(String s) throws JMSException {
173            getInternalSession().unsubscribe(s);
174        }
175    
176        public TextMessage createTextMessage() throws JMSException {
177            return getInternalSession().createTextMessage();
178        }
179    
180        public TextMessage createTextMessage(String s) throws JMSException {
181            return getInternalSession().createTextMessage(s);
182        }
183    
184        public Topic createTopic(String s) throws JMSException {
185            return getInternalSession().createTopic(s);
186        }
187    
188        public int getAcknowledgeMode() throws JMSException {
189            return getInternalSession().getAcknowledgeMode();
190        }
191    
192        public boolean getTransacted() throws JMSException {
193            return getInternalSession().getTransacted();
194        }
195    
196        public void recover() throws JMSException {
197            getInternalSession().recover();
198        }
199    
200        public void rollback() throws JMSException {
201            getInternalSession().rollback();
202        }
203    
204        public XAResource getXAResource() {
205            if (session == null) {
206                throw new IllegalStateException("Session is closed");
207            }
208            return session.getTransactionContext();
209        }
210    
211        public Session getSession() {
212            return this;
213        }
214    
215        public void run() {
216            if (session != null) {
217                session.run();
218            }
219        }
220    
221        // Consumer related methods
222        // -------------------------------------------------------------------------
223        public QueueBrowser createBrowser(Queue queue) throws JMSException {
224            return addQueueBrowser(getInternalSession().createBrowser(queue));
225        }
226    
227        public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
228            return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
229        }
230    
231        public MessageConsumer createConsumer(Destination destination) throws JMSException {
232            return addConsumer(getInternalSession().createConsumer(destination));
233        }
234    
235        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
236            return addConsumer(getInternalSession().createConsumer(destination, selector));
237        }
238    
239        public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
240            return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
241        }
242    
243        public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
244            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
245        }
246    
247        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
248            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
249        }
250    
251        public MessageListener getMessageListener() throws JMSException {
252            return getInternalSession().getMessageListener();
253        }
254    
255        public void setMessageListener(MessageListener messageListener) throws JMSException {
256            getInternalSession().setMessageListener(messageListener);
257        }
258    
259        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
260            return addTopicSubscriber(getInternalSession().createSubscriber(topic));
261        }
262    
263        public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
264            return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
265        }
266    
267        public QueueReceiver createReceiver(Queue queue) throws JMSException {
268            return addQueueReceiver(getInternalSession().createReceiver(queue));
269        }
270    
271        public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
272            return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
273        }
274    
275        // Producer related methods
276        // -------------------------------------------------------------------------
277        public MessageProducer createProducer(Destination destination) throws JMSException {
278            return new PooledProducer(getMessageProducer(), destination);
279        }
280    
281        public QueueSender createSender(Queue queue) throws JMSException {
282            return new PooledQueueSender(getQueueSender(), queue);
283        }
284    
285        public TopicPublisher createPublisher(Topic topic) throws JMSException {
286            return new PooledTopicPublisher(getTopicPublisher(), topic);
287        }
288    
289        public ActiveMQSession getInternalSession() throws AlreadyClosedException {
290            if (session == null) {
291                throw new AlreadyClosedException("The session has already been closed");
292            }
293            return session;
294        }
295    
296        public ActiveMQMessageProducer getMessageProducer() throws JMSException {
297            if (messageProducer == null) {
298                messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
299            }
300            return messageProducer;
301        }
302    
303        public ActiveMQQueueSender getQueueSender() throws JMSException {
304            if (queueSender == null) {
305                queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
306            }
307            return queueSender;
308        }
309    
310        public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
311            if (topicPublisher == null) {
312                topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
313            }
314            return topicPublisher;
315        }
316    
317        private QueueBrowser addQueueBrowser(QueueBrowser browser) {
318            browsers.add(browser);
319            return browser;
320        }
321    
322        private MessageConsumer addConsumer(MessageConsumer consumer) {
323            consumers.add(consumer);
324            return consumer;
325        }
326    
327        private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
328            consumers.add(subscriber);
329            return subscriber;
330        }
331    
332        private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
333            consumers.add(receiver);
334            return receiver;
335        }
336    
337        public String toString() {
338            return "PooledSession { " + session + " }";
339        }
340    }