001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.HashMap;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import javax.jms.Connection;
025    import javax.jms.ConnectionFactory;
026    import javax.jms.JMSException;
027    import org.apache.activemq.ActiveMQConnection;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.util.IOExceptionSupport;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    import org.apache.commons.pool.ObjectPoolFactory;
034    import org.apache.commons.pool.impl.GenericObjectPoolFactory;
035    
036    /**
037     * A JMS provider which pools Connection, Session and MessageProducer instances
038     * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
039     * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
040     * Connections, sessions and producers are returned to a pool after use so that they can be reused later
041     * without having to undergo the cost of creating them again.
042     * 
043     * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
044     * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which 
045     * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
046     * just created at startup and left active, handling incoming messages as they come. When a consumer is
047     * complete, it is best to close it rather than return it to a pool for later reuse: this is because, 
048     * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
049     * where they'll get held until the consumer is active again.
050     * 
051     * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
052     * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that 
053     * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: 
054     * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
055     * 
056     * @org.apache.xbean.XBean element="pooledConnectionFactory"
057     * 
058     * 
059     */
060    public class PooledConnectionFactory implements ConnectionFactory, Service {
061        private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
062        private ConnectionFactory connectionFactory;
063        private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
064        private ObjectPoolFactory poolFactory;
065        private int maximumActive = 500;
066        private int maxConnections = 1;
067        private int idleTimeout = 30 * 1000;
068        private AtomicBoolean stopped = new AtomicBoolean(false);
069        private long expiryTimeout = 0l;
070    
071        public PooledConnectionFactory() {
072            this(new ActiveMQConnectionFactory());
073        }
074    
075        public PooledConnectionFactory(String brokerURL) {
076            this(new ActiveMQConnectionFactory(brokerURL));
077        }
078    
079        public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
080            this.connectionFactory = connectionFactory;
081        }
082    
083        public ConnectionFactory getConnectionFactory() {
084            return connectionFactory;
085        }
086    
087        public void setConnectionFactory(ConnectionFactory connectionFactory) {
088            this.connectionFactory = connectionFactory;
089        }
090    
091        public Connection createConnection() throws JMSException {
092            return createConnection(null, null);
093        }
094    
095        public synchronized Connection createConnection(String userName, String password) throws JMSException {
096            if (stopped.get()) {
097                LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
098                return null;
099            }
100            
101            ConnectionKey key = new ConnectionKey(userName, password);
102            LinkedList<ConnectionPool> pools = cache.get(key);
103    
104            if (pools == null) {
105                pools = new LinkedList<ConnectionPool>();
106                cache.put(key, pools);
107            }
108    
109            ConnectionPool connection = null;
110            if (pools.size() == maxConnections) {
111                connection = pools.removeFirst();
112            }
113    
114            // Now.. we might get a connection, but it might be that we need to
115            // dump it..
116            if (connection != null && connection.expiredCheck()) {
117                connection = null;
118            }
119    
120            if (connection == null) {
121                ActiveMQConnection delegate = createConnection(key);
122                connection = createConnectionPool(delegate);
123            }
124            pools.add(connection);
125            return new PooledConnection(connection);
126        }
127    
128        protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
129            ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
130            result.setIdleTimeout(getIdleTimeout());
131            result.setExpiryTimeout(getExpiryTimeout());
132            return result;
133        }
134    
135        protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
136            if (key.getUserName() == null && key.getPassword() == null) {
137                return (ActiveMQConnection)connectionFactory.createConnection();
138            } else {
139                return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
140            }
141        }
142    
143        /**
144         * @see org.apache.activemq.service.Service#start()
145         */
146        public void start() {
147            try {
148                stopped.set(false);
149                createConnection();
150            } catch (JMSException e) {
151                LOG.warn("Create pooled connection during start failed.", e);
152                IOExceptionSupport.create(e);
153            }
154        }
155    
156        public void stop() {
157            LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
158            stopped.set(true);
159            for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
160                for (ConnectionPool connection : iter.next()) {
161                    try {
162                        connection.close();
163                    }catch(Exception e) {
164                        LOG.warn("Close connection failed",e);
165                    }
166                }
167            }
168            cache.clear();
169        }
170    
171        public ObjectPoolFactory getPoolFactory() {
172            if (poolFactory == null) {
173                poolFactory = createPoolFactory();
174            }
175            return poolFactory;
176        }
177    
178        /**
179         * Sets the object pool factory used to create individual session pools for
180         * each connection
181         */
182        public void setPoolFactory(ObjectPoolFactory poolFactory) {
183            this.poolFactory = poolFactory;
184        }
185    
186        public int getMaximumActive() {
187            return maximumActive;
188        }
189    
190        /**
191         * Sets the maximum number of active sessions per connection
192         */
193        public void setMaximumActive(int maximumActive) {
194            this.maximumActive = maximumActive;
195        }
196    
197        /**
198         * @return the maxConnections
199         */
200        public int getMaxConnections() {
201            return maxConnections;
202        }
203    
204        /**
205         * @param maxConnections the maxConnections to set
206         */
207        public void setMaxConnections(int maxConnections) {
208            this.maxConnections = maxConnections;
209        }
210    
211        protected ObjectPoolFactory createPoolFactory() {
212            return new GenericObjectPoolFactory(null, maximumActive);
213        }
214    
215        public int getIdleTimeout() {
216            return idleTimeout;
217        }
218    
219        public void setIdleTimeout(int idleTimeout) {
220            this.idleTimeout = idleTimeout;
221        }
222    
223        /**
224         * allow connections to expire, irrespective of load or idle time. This is useful with failover
225         * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
226         * 
227         * @param expiryTimeout non zero in milliseconds
228         */
229        public void setExpiryTimeout(long expiryTimeout) {
230            this.expiryTimeout = expiryTimeout;   
231        }
232        
233        public long getExpiryTimeout() {
234            return expiryTimeout;
235        }
236    }