001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.pool; 019 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.concurrent.atomic.AtomicBoolean; 025 026 import javax.jms.JMSException; 027 import javax.jms.Session; 028 029 import org.apache.activemq.ActiveMQConnection; 030 import org.apache.activemq.transport.TransportListener; 031 import org.apache.commons.pool.ObjectPoolFactory; 032 033 /** 034 * Holds a real JMS connection along with the session pools associated with it. 035 * 036 * 037 */ 038 public class ConnectionPool { 039 040 private ActiveMQConnection connection; 041 private Map<SessionKey, SessionPool> cache; 042 private AtomicBoolean started = new AtomicBoolean(false); 043 private int referenceCount; 044 private ObjectPoolFactory poolFactory; 045 private long lastUsed = System.currentTimeMillis(); 046 private long firstUsed = lastUsed; 047 private boolean hasFailed; 048 private boolean hasExpired; 049 private int idleTimeout = 30 * 1000; 050 private long expiryTimeout = 0l; 051 052 public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { 053 this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory); 054 // Add a transport Listener so that we can notice if this connection 055 // should be expired due to 056 // a connection failure. 057 connection.addTransportListener(new TransportListener() { 058 public void onCommand(Object command) { 059 } 060 061 public void onException(IOException error) { 062 synchronized (ConnectionPool.this) { 063 hasFailed = true; 064 } 065 } 066 067 public void transportInterupted() { 068 } 069 070 public void transportResumed() { 071 } 072 }); 073 // 074 // make sure that we set the hasFailed flag, in case the transport already failed 075 // prior to the addition of our new TransportListener 076 // 077 if(connection.isTransportFailed()) { 078 hasFailed = true; 079 } 080 } 081 082 public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) { 083 this.connection = connection; 084 this.cache = cache; 085 this.poolFactory = poolFactory; 086 } 087 088 public void start() throws JMSException { 089 if (started.compareAndSet(false, true)) { 090 try { 091 connection.start(); 092 } catch (JMSException e) { 093 started.set(false); 094 throw(e); 095 } 096 } 097 } 098 099 public synchronized ActiveMQConnection getConnection() { 100 return connection; 101 } 102 103 public Session createSession(boolean transacted, int ackMode) throws JMSException { 104 SessionKey key = new SessionKey(transacted, ackMode); 105 SessionPool pool = cache.get(key); 106 if (pool == null) { 107 pool = createSessionPool(key); 108 cache.put(key, pool); 109 } 110 PooledSession session = pool.borrowSession(); 111 return session; 112 } 113 114 public synchronized void close() { 115 if (connection != null) { 116 try { 117 Iterator<SessionPool> i = cache.values().iterator(); 118 while (i.hasNext()) { 119 SessionPool pool = i.next(); 120 i.remove(); 121 try { 122 pool.close(); 123 } catch (Exception e) { 124 } 125 } 126 } finally { 127 try { 128 connection.close(); 129 } catch (Exception e) { 130 } finally { 131 connection = null; 132 } 133 } 134 } 135 } 136 137 public synchronized void incrementReferenceCount() { 138 referenceCount++; 139 lastUsed = System.currentTimeMillis(); 140 } 141 142 public synchronized void decrementReferenceCount() { 143 referenceCount--; 144 lastUsed = System.currentTimeMillis(); 145 if (referenceCount == 0) { 146 expiredCheck(); 147 } 148 } 149 150 /** 151 * @return true if this connection has expired. 152 */ 153 public synchronized boolean expiredCheck() { 154 if (connection == null) { 155 return true; 156 } 157 if (hasExpired) { 158 if (referenceCount == 0) { 159 close(); 160 } 161 return true; 162 } 163 if (hasFailed 164 || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) 165 || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { 166 hasExpired = true; 167 if (referenceCount == 0) { 168 close(); 169 } 170 return true; 171 } 172 return false; 173 } 174 175 public int getIdleTimeout() { 176 return idleTimeout; 177 } 178 179 public void setIdleTimeout(int idleTimeout) { 180 this.idleTimeout = idleTimeout; 181 } 182 183 protected SessionPool createSessionPool(SessionKey key) { 184 return new SessionPool(this, key, poolFactory.createPool()); 185 } 186 187 public void setExpiryTimeout(long expiryTimeout) { 188 this.expiryTimeout = expiryTimeout; 189 } 190 191 public long getExpiryTimeout() { 192 return expiryTimeout; 193 } 194 195 }