001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.pool;
019    
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import javax.jms.JMSException;
027    import javax.jms.Session;
028    
029    import org.apache.activemq.ActiveMQConnection;
030    import org.apache.activemq.transport.TransportListener;
031    import org.apache.commons.pool.ObjectPoolFactory;
032    
033    /**
034     * Holds a real JMS connection along with the session pools associated with it.
035     * 
036     * 
037     */
038    public class ConnectionPool {
039    
040        private ActiveMQConnection connection;
041        private Map<SessionKey, SessionPool> cache;
042        private AtomicBoolean started = new AtomicBoolean(false);
043        private int referenceCount;
044        private ObjectPoolFactory poolFactory;
045        private long lastUsed = System.currentTimeMillis();
046        private long firstUsed = lastUsed;
047        private boolean hasFailed;
048        private boolean hasExpired;
049        private int idleTimeout = 30 * 1000;
050        private long expiryTimeout = 0l;
051    
052        public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
053            this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
054            // Add a transport Listener so that we can notice if this connection
055            // should be expired due to
056            // a connection failure.
057            connection.addTransportListener(new TransportListener() {
058                public void onCommand(Object command) {
059                }
060    
061                public void onException(IOException error) {
062                    synchronized (ConnectionPool.this) {
063                        hasFailed = true;
064                    }
065                }
066    
067                public void transportInterupted() {
068                }
069    
070                public void transportResumed() {
071                }
072            });       
073            //
074            // make sure that we set the hasFailed flag, in case the transport already failed
075            // prior to the addition of our new TransportListener
076            //
077            if(connection.isTransportFailed()) {
078                hasFailed = true;
079            }
080        }
081    
082        public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
083            this.connection = connection;
084            this.cache = cache;
085            this.poolFactory = poolFactory;
086        }
087    
088        public void start() throws JMSException {
089            if (started.compareAndSet(false, true)) {
090                    try {
091                            connection.start();
092                    } catch (JMSException e) {
093                            started.set(false);
094                            throw(e);
095                    }
096            }
097        }
098    
099        public synchronized ActiveMQConnection getConnection() {
100            return connection;
101        }
102    
103        public Session createSession(boolean transacted, int ackMode) throws JMSException {
104            SessionKey key = new SessionKey(transacted, ackMode);
105            SessionPool pool = cache.get(key);
106            if (pool == null) {
107                pool = createSessionPool(key);
108                cache.put(key, pool);
109            }
110            PooledSession session = pool.borrowSession();
111            return session;
112        }
113    
114        public synchronized void close() {
115            if (connection != null) {
116                try {
117                    Iterator<SessionPool> i = cache.values().iterator();
118                    while (i.hasNext()) {
119                        SessionPool pool = i.next();
120                        i.remove();
121                        try {
122                            pool.close();
123                        } catch (Exception e) {
124                        }
125                    }
126                } finally {
127                    try {
128                        connection.close();
129                    } catch (Exception e) {
130                    } finally {
131                        connection = null;
132                    }
133                }
134            }
135        }
136    
137        public synchronized void incrementReferenceCount() {
138            referenceCount++;
139            lastUsed = System.currentTimeMillis();
140        }
141    
142        public synchronized void decrementReferenceCount() {
143            referenceCount--;
144            lastUsed = System.currentTimeMillis();
145            if (referenceCount == 0) {
146                expiredCheck();
147            }
148        }
149    
150        /**
151         * @return true if this connection has expired.
152         */
153        public synchronized boolean expiredCheck() {
154            if (connection == null) {
155                return true;
156            }
157            if (hasExpired) {
158                if (referenceCount == 0) {
159                    close();
160                }
161                return true;
162            }
163            if (hasFailed 
164                    || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
165                    || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
166                hasExpired = true;
167                if (referenceCount == 0) {
168                    close();
169                }
170                return true;
171            }
172            return false;
173        }
174    
175        public int getIdleTimeout() {
176            return idleTimeout;
177        }
178    
179        public void setIdleTimeout(int idleTimeout) {
180            this.idleTimeout = idleTimeout;
181        }
182    
183        protected SessionPool createSessionPool(SessionKey key) {
184            return new SessionPool(this, key, poolFactory.createPool());
185        }
186    
187        public void setExpiryTimeout(long expiryTimeout) {
188            this.expiryTimeout  = expiryTimeout;
189        }
190        
191        public long getExpiryTimeout() {
192            return expiryTimeout;
193        }
194    
195    }