001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq; 019 020 import java.util.List; 021 import javax.jms.JMSException; 022 import org.apache.activemq.command.ConsumerId; 023 import org.apache.activemq.command.MessageDispatch; 024 import org.apache.activemq.thread.Task; 025 import org.apache.activemq.thread.TaskRunner; 026 import org.apache.activemq.util.JMSExceptionSupport; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 /** 031 * A utility class used by the Session for dispatching messages asynchronously 032 * to consumers 033 * 034 * 035 * @see javax.jms.Session 036 */ 037 public class ActiveMQSessionExecutor implements Task { 038 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class); 039 040 private final ActiveMQSession session; 041 private final MessageDispatchChannel messageQueue; 042 private boolean dispatchedBySessionPool; 043 private volatile TaskRunner taskRunner; 044 private boolean startedOrWarnedThatNotStarted; 045 046 ActiveMQSessionExecutor(ActiveMQSession session) { 047 this.session = session; 048 if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) { 049 this.messageQueue = new SimplePriorityMessageDispatchChannel(); 050 }else { 051 this.messageQueue = new FifoMessageDispatchChannel(); 052 } 053 } 054 055 void setDispatchedBySessionPool(boolean value) { 056 dispatchedBySessionPool = value; 057 wakeup(); 058 } 059 060 void execute(MessageDispatch message) throws InterruptedException { 061 062 if (!startedOrWarnedThatNotStarted) { 063 064 ActiveMQConnection connection = session.connection; 065 long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); 066 if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) { 067 startedOrWarnedThatNotStarted = true; 068 } else { 069 long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); 070 071 // lets only warn when a significant amount of time has passed 072 // just in case its normal operation 073 if (elapsedTime > aboutUnstartedConnectionTimeout) { 074 LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection 075 + " Received: " + message); 076 startedOrWarnedThatNotStarted = true; 077 } 078 } 079 } 080 081 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { 082 dispatch(message); 083 } else { 084 messageQueue.enqueue(message); 085 wakeup(); 086 } 087 } 088 089 public void wakeup() { 090 if (!dispatchedBySessionPool) { 091 if (session.isSessionAsyncDispatch()) { 092 try { 093 TaskRunner taskRunner = this.taskRunner; 094 if (taskRunner == null) { 095 synchronized (this) { 096 if (this.taskRunner == null) { 097 if (!isRunning()) { 098 // stop has been called 099 return; 100 } 101 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, 102 "ActiveMQ Session: " + session.getSessionId()); 103 } 104 taskRunner = this.taskRunner; 105 } 106 } 107 taskRunner.wakeup(); 108 } catch (InterruptedException e) { 109 Thread.currentThread().interrupt(); 110 } 111 } else { 112 while (iterate()) { 113 } 114 } 115 } 116 } 117 118 void executeFirst(MessageDispatch message) { 119 messageQueue.enqueueFirst(message); 120 wakeup(); 121 } 122 123 public boolean hasUncomsumedMessages() { 124 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); 125 } 126 127 void dispatch(MessageDispatch message) { 128 129 // TODO - we should use a Map for this indexed by consumerId 130 131 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 132 ConsumerId consumerId = message.getConsumerId(); 133 if (consumerId.equals(consumer.getConsumerId())) { 134 consumer.dispatch(message); 135 break; 136 } 137 } 138 } 139 140 synchronized void start() { 141 if (!messageQueue.isRunning()) { 142 messageQueue.start(); 143 if (hasUncomsumedMessages()) { 144 wakeup(); 145 } 146 } 147 } 148 149 void stop() throws JMSException { 150 try { 151 if (messageQueue.isRunning()) { 152 synchronized(this) { 153 messageQueue.stop(); 154 if (this.taskRunner != null) { 155 this.taskRunner.shutdown(); 156 this.taskRunner = null; 157 } 158 } 159 } 160 } catch (InterruptedException e) { 161 Thread.currentThread().interrupt(); 162 throw JMSExceptionSupport.create(e); 163 } 164 } 165 166 boolean isRunning() { 167 return messageQueue.isRunning(); 168 } 169 170 void close() { 171 messageQueue.close(); 172 } 173 174 void clear() { 175 messageQueue.clear(); 176 } 177 178 MessageDispatch dequeueNoWait() { 179 return messageQueue.dequeueNoWait(); 180 } 181 182 protected void clearMessagesInProgress() { 183 messageQueue.clear(); 184 } 185 186 public boolean isEmpty() { 187 return messageQueue.isEmpty(); 188 } 189 190 public boolean iterate() { 191 192 // Deliver any messages queued on the consumer to their listeners. 193 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 194 if (consumer.iterate()) { 195 return true; 196 } 197 } 198 199 // No messages left queued on the listeners.. so now dispatch messages 200 // queued on the session 201 MessageDispatch message = messageQueue.dequeueNoWait(); 202 if (message == null) { 203 return false; 204 } else { 205 dispatch(message); 206 return !messageQueue.isEmpty(); 207 } 208 } 209 210 List getUnconsumedMessages() { 211 return messageQueue.removeAll(); 212 } 213 214 }