001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.io.OutputStream; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.HashMap; 025 import java.util.Iterator; 026 import java.util.Map; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.LinkedBlockingQueue; 031 import java.util.concurrent.ThreadFactory; 032 import java.util.concurrent.ThreadPoolExecutor; 033 import java.util.concurrent.TimeUnit; 034 import java.util.concurrent.atomic.AtomicBoolean; 035 import java.util.concurrent.atomic.AtomicInteger; 036 import javax.jms.Connection; 037 import javax.jms.ConnectionConsumer; 038 import javax.jms.ConnectionMetaData; 039 import javax.jms.DeliveryMode; 040 import javax.jms.Destination; 041 import javax.jms.ExceptionListener; 042 import javax.jms.IllegalStateException; 043 import javax.jms.InvalidDestinationException; 044 import javax.jms.JMSException; 045 import javax.jms.Queue; 046 import javax.jms.QueueConnection; 047 import javax.jms.QueueSession; 048 import javax.jms.ServerSessionPool; 049 import javax.jms.Session; 050 import javax.jms.Topic; 051 import javax.jms.TopicConnection; 052 import javax.jms.TopicSession; 053 import javax.jms.XAConnection; 054 import org.apache.activemq.advisory.DestinationSource; 055 import org.apache.activemq.blob.BlobTransferPolicy; 056 import org.apache.activemq.command.ActiveMQDestination; 057 import org.apache.activemq.command.ActiveMQMessage; 058 import org.apache.activemq.command.ActiveMQTempDestination; 059 import org.apache.activemq.command.ActiveMQTempQueue; 060 import org.apache.activemq.command.ActiveMQTempTopic; 061 import org.apache.activemq.command.BrokerInfo; 062 import org.apache.activemq.command.Command; 063 import org.apache.activemq.command.CommandTypes; 064 import org.apache.activemq.command.ConnectionControl; 065 import org.apache.activemq.command.ConnectionError; 066 import org.apache.activemq.command.ConnectionId; 067 import org.apache.activemq.command.ConnectionInfo; 068 import org.apache.activemq.command.ConsumerControl; 069 import org.apache.activemq.command.ConsumerId; 070 import org.apache.activemq.command.ConsumerInfo; 071 import org.apache.activemq.command.ControlCommand; 072 import org.apache.activemq.command.DestinationInfo; 073 import org.apache.activemq.command.ExceptionResponse; 074 import org.apache.activemq.command.Message; 075 import org.apache.activemq.command.MessageDispatch; 076 import org.apache.activemq.command.MessageId; 077 import org.apache.activemq.command.ProducerAck; 078 import org.apache.activemq.command.ProducerId; 079 import org.apache.activemq.command.RemoveInfo; 080 import org.apache.activemq.command.RemoveSubscriptionInfo; 081 import org.apache.activemq.command.Response; 082 import org.apache.activemq.command.SessionId; 083 import org.apache.activemq.command.ShutdownInfo; 084 import org.apache.activemq.command.WireFormatInfo; 085 import org.apache.activemq.management.JMSConnectionStatsImpl; 086 import org.apache.activemq.management.JMSStatsImpl; 087 import org.apache.activemq.management.StatsCapable; 088 import org.apache.activemq.management.StatsImpl; 089 import org.apache.activemq.state.CommandVisitorAdapter; 090 import org.apache.activemq.thread.Scheduler; 091 import org.apache.activemq.thread.TaskRunnerFactory; 092 import org.apache.activemq.transport.Transport; 093 import org.apache.activemq.transport.TransportListener; 094 import org.apache.activemq.transport.failover.FailoverTransport; 095 import org.apache.activemq.util.IdGenerator; 096 import org.apache.activemq.util.IntrospectionSupport; 097 import org.apache.activemq.util.JMSExceptionSupport; 098 import org.apache.activemq.util.LongSequenceGenerator; 099 import org.apache.activemq.util.ServiceSupport; 100 import org.slf4j.Logger; 101 import org.slf4j.LoggerFactory; 102 103 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { 104 105 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 106 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 107 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 108 109 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 110 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 111 112 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 113 114 protected boolean dispatchAsync=true; 115 protected boolean alwaysSessionAsync = true; 116 117 private TaskRunnerFactory sessionTaskRunner; 118 private final ThreadPoolExecutor executor; 119 120 // Connection state variables 121 private final ConnectionInfo info; 122 private ExceptionListener exceptionListener; 123 private ClientInternalExceptionListener clientInternalExceptionListener; 124 private boolean clientIDSet; 125 private boolean isConnectionInfoSentToBroker; 126 private boolean userSpecifiedClientID; 127 128 // Configuration options variables 129 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 130 private BlobTransferPolicy blobTransferPolicy; 131 private RedeliveryPolicy redeliveryPolicy; 132 private MessageTransformer transformer; 133 134 private boolean disableTimeStampsByDefault; 135 private boolean optimizedMessageDispatch = true; 136 private boolean copyMessageOnSend = true; 137 private boolean useCompression; 138 private boolean objectMessageSerializationDefered; 139 private boolean useAsyncSend; 140 private boolean optimizeAcknowledge; 141 private boolean nestedMapAndListEnabled = true; 142 private boolean useRetroactiveConsumer; 143 private boolean exclusiveConsumer; 144 private boolean alwaysSyncSend; 145 private int closeTimeout = 15000; 146 private boolean watchTopicAdvisories = true; 147 private long warnAboutUnstartedConnectionTimeout = 500L; 148 private int sendTimeout =0; 149 private boolean sendAcksAsync=true; 150 private boolean checkForDuplicates = true; 151 152 private final Transport transport; 153 private final IdGenerator clientIdGenerator; 154 private final JMSStatsImpl factoryStats; 155 private final JMSConnectionStatsImpl stats; 156 157 private final AtomicBoolean started = new AtomicBoolean(false); 158 private final AtomicBoolean closing = new AtomicBoolean(false); 159 private final AtomicBoolean closed = new AtomicBoolean(false); 160 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 161 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 162 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 163 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); 164 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); 165 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 166 167 // Maps ConsumerIds to ActiveMQConsumer objects 168 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 169 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 170 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 171 private final SessionId connectionSessionId; 172 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 173 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 174 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 175 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 176 177 private AdvisoryConsumer advisoryConsumer; 178 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 179 private BrokerInfo brokerInfo; 180 private IOException firstFailureError; 181 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 182 183 // Assume that protocol is the latest. Change to the actual protocol 184 // version when a WireFormatInfo is received. 185 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 186 private final long timeCreated; 187 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 188 private DestinationSource destinationSource; 189 private final Object ensureConnectionInfoSentMutex = new Object(); 190 private boolean useDedicatedTaskRunner; 191 protected volatile CountDownLatch transportInterruptionProcessingComplete; 192 private long consumerFailoverRedeliveryWaitPeriod; 193 private final Scheduler scheduler; 194 private boolean messagePrioritySupported=true; 195 196 /** 197 * Construct an <code>ActiveMQConnection</code> 198 * 199 * @param transport 200 * @param factoryStats 201 * @throws Exception 202 */ 203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { 204 205 this.transport = transport; 206 this.clientIdGenerator = clientIdGenerator; 207 this.factoryStats = factoryStats; 208 209 // Configure a single threaded executor who's core thread can timeout if 210 // idle 211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 212 public Thread newThread(Runnable r) { 213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 214 thread.setDaemon(true); 215 return thread; 216 } 217 }); 218 // asyncConnectionThread.allowCoreThreadTimeOut(true); 219 String uniqueId = CONNECTION_ID_GENERATOR.generateId(); 220 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 221 this.info.setManageable(true); 222 this.info.setFaultTolerant(transport.isFaultTolerant()); 223 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 224 225 this.transport.setTransportListener(this); 226 227 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 228 this.factoryStats.addConnection(this); 229 this.timeCreated = System.currentTimeMillis(); 230 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 231 this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler"); 232 this.scheduler.start(); 233 } 234 235 protected void setUserName(String userName) { 236 this.info.setUserName(userName); 237 } 238 239 protected void setPassword(String password) { 240 this.info.setPassword(password); 241 } 242 243 /** 244 * A static helper method to create a new connection 245 * 246 * @return an ActiveMQConnection 247 * @throws JMSException 248 */ 249 public static ActiveMQConnection makeConnection() throws JMSException { 250 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 251 return (ActiveMQConnection)factory.createConnection(); 252 } 253 254 /** 255 * A static helper method to create a new connection 256 * 257 * @param uri 258 * @return and ActiveMQConnection 259 * @throws JMSException 260 */ 261 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 262 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 263 return (ActiveMQConnection)factory.createConnection(); 264 } 265 266 /** 267 * A static helper method to create a new connection 268 * 269 * @param user 270 * @param password 271 * @param uri 272 * @return an ActiveMQConnection 273 * @throws JMSException 274 */ 275 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 276 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 277 return (ActiveMQConnection)factory.createConnection(); 278 } 279 280 /** 281 * @return a number unique for this connection 282 */ 283 public JMSConnectionStatsImpl getConnectionStats() { 284 return stats; 285 } 286 287 /** 288 * Creates a <CODE>Session</CODE> object. 289 * 290 * @param transacted indicates whether the session is transacted 291 * @param acknowledgeMode indicates whether the consumer or the client will 292 * acknowledge any messages it receives; ignored if the 293 * session is transacted. Legal values are 294 * <code>Session.AUTO_ACKNOWLEDGE</code>, 295 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 296 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 297 * @return a newly created session 298 * @throws JMSException if the <CODE>Connection</CODE> object fails to 299 * create a session due to some internal error or lack of 300 * support for the specific transaction and acknowledgement 301 * mode. 302 * @see Session#AUTO_ACKNOWLEDGE 303 * @see Session#CLIENT_ACKNOWLEDGE 304 * @see Session#DUPS_OK_ACKNOWLEDGE 305 * @since 1.1 306 */ 307 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 308 checkClosedOrFailed(); 309 ensureConnectionInfoSent(); 310 if(!transacted) { 311 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 312 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 313 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 314 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 315 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 316 } 317 } 318 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 319 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 320 } 321 322 /** 323 * @return sessionId 324 */ 325 protected SessionId getNextSessionId() { 326 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 327 } 328 329 /** 330 * Gets the client identifier for this connection. 331 * <P> 332 * This value is specific to the JMS provider. It is either preconfigured by 333 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 334 * dynamically by the application by calling the <code>setClientID</code> 335 * method. 336 * 337 * @return the unique client identifier 338 * @throws JMSException if the JMS provider fails to return the client ID 339 * for this connection due to some internal error. 340 */ 341 public String getClientID() throws JMSException { 342 checkClosedOrFailed(); 343 return this.info.getClientId(); 344 } 345 346 /** 347 * Sets the client identifier for this connection. 348 * <P> 349 * The preferred way to assign a JMS client's client identifier is for it to 350 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 351 * object and transparently assigned to the <CODE>Connection</CODE> object 352 * it creates. 353 * <P> 354 * Alternatively, a client can set a connection's client identifier using a 355 * provider-specific value. The facility to set a connection's client 356 * identifier explicitly is not a mechanism for overriding the identifier 357 * that has been administratively configured. It is provided for the case 358 * where no administratively specified identifier exists. If one does exist, 359 * an attempt to change it by setting it must throw an 360 * <CODE>IllegalStateException</CODE>. If a client sets the client 361 * identifier explicitly, it must do so immediately after it creates the 362 * connection and before any other action on the connection is taken. After 363 * this point, setting the client identifier is a programming error that 364 * should throw an <CODE>IllegalStateException</CODE>. 365 * <P> 366 * The purpose of the client identifier is to associate a connection and its 367 * objects with a state maintained on behalf of the client by a provider. 368 * The only such state identified by the JMS API is that required to support 369 * durable subscriptions. 370 * <P> 371 * If another connection with the same <code>clientID</code> is already 372 * running when this method is called, the JMS provider should detect the 373 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 374 * 375 * @param newClientID the unique client identifier 376 * @throws JMSException if the JMS provider fails to set the client ID for 377 * this connection due to some internal error. 378 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 379 * invalid or duplicate client ID. 380 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 381 * a connection's client ID at the wrong time or when it has 382 * been administratively configured. 383 */ 384 public void setClientID(String newClientID) throws JMSException { 385 checkClosedOrFailed(); 386 387 if (this.clientIDSet) { 388 throw new IllegalStateException("The clientID has already been set"); 389 } 390 391 if (this.isConnectionInfoSentToBroker) { 392 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 393 } 394 395 this.info.setClientId(newClientID); 396 this.userSpecifiedClientID = true; 397 ensureConnectionInfoSent(); 398 } 399 400 /** 401 * Sets the default client id that the connection will use if explicitly not 402 * set with the setClientId() call. 403 */ 404 public void setDefaultClientID(String clientID) throws JMSException { 405 this.info.setClientId(clientID); 406 this.userSpecifiedClientID = true; 407 } 408 409 /** 410 * Gets the metadata for this connection. 411 * 412 * @return the connection metadata 413 * @throws JMSException if the JMS provider fails to get the connection 414 * metadata for this connection. 415 * @see javax.jms.ConnectionMetaData 416 */ 417 public ConnectionMetaData getMetaData() throws JMSException { 418 checkClosedOrFailed(); 419 return ActiveMQConnectionMetaData.INSTANCE; 420 } 421 422 /** 423 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 424 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 425 * associated with it. 426 * 427 * @return the <CODE>ExceptionListener</CODE> for this connection, or 428 * null, if no <CODE>ExceptionListener</CODE> is associated with 429 * this connection. 430 * @throws JMSException if the JMS provider fails to get the 431 * <CODE>ExceptionListener</CODE> for this connection. 432 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 433 */ 434 public ExceptionListener getExceptionListener() throws JMSException { 435 checkClosedOrFailed(); 436 return this.exceptionListener; 437 } 438 439 /** 440 * Sets an exception listener for this connection. 441 * <P> 442 * If a JMS provider detects a serious problem with a connection, it informs 443 * the connection's <CODE> ExceptionListener</CODE>, if one has been 444 * registered. It does this by calling the listener's <CODE>onException 445 * </CODE> 446 * method, passing it a <CODE>JMSException</CODE> object describing the 447 * problem. 448 * <P> 449 * An exception listener allows a client to be notified of a problem 450 * asynchronously. Some connections only consume messages, so they would 451 * have no other way to learn their connection has failed. 452 * <P> 453 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 454 * <P> 455 * A JMS provider should attempt to resolve connection problems itself 456 * before it notifies the client of them. 457 * 458 * @param listener the exception listener 459 * @throws JMSException if the JMS provider fails to set the exception 460 * listener for this connection. 461 */ 462 public void setExceptionListener(ExceptionListener listener) throws JMSException { 463 checkClosedOrFailed(); 464 this.exceptionListener = listener; 465 } 466 467 /** 468 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 469 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 470 * associated with it. 471 * 472 * @return the listener or <code>null</code> if no listener is registered with the connection. 473 */ 474 public ClientInternalExceptionListener getClientInternalExceptionListener() 475 { 476 return clientInternalExceptionListener; 477 } 478 479 /** 480 * Sets a client internal exception listener for this connection. 481 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 482 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 483 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 484 * describing the problem. 485 * 486 * @param listener the exception listener 487 */ 488 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) 489 { 490 this.clientInternalExceptionListener = listener; 491 } 492 493 /** 494 * Starts (or restarts) a connection's delivery of incoming messages. A call 495 * to <CODE>start</CODE> on a connection that has already been started is 496 * ignored. 497 * 498 * @throws JMSException if the JMS provider fails to start message delivery 499 * due to some internal error. 500 * @see javax.jms.Connection#stop() 501 */ 502 public void start() throws JMSException { 503 checkClosedOrFailed(); 504 ensureConnectionInfoSent(); 505 if (started.compareAndSet(false, true)) { 506 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 507 ActiveMQSession session = i.next(); 508 session.start(); 509 } 510 } 511 } 512 513 /** 514 * Temporarily stops a connection's delivery of incoming messages. Delivery 515 * can be restarted using the connection's <CODE>start</CODE> method. When 516 * the connection is stopped, delivery to all the connection's message 517 * consumers is inhibited: synchronous receives block, and messages are not 518 * delivered to message listeners. 519 * <P> 520 * This call blocks until receives and/or message listeners in progress have 521 * completed. 522 * <P> 523 * Stopping a connection has no effect on its ability to send messages. A 524 * call to <CODE>stop</CODE> on a connection that has already been stopped 525 * is ignored. 526 * <P> 527 * A call to <CODE>stop</CODE> must not return until delivery of messages 528 * has paused. This means that a client can rely on the fact that none of 529 * its message listeners will be called and that all threads of control 530 * waiting for <CODE>receive</CODE> calls to return will not return with a 531 * message until the connection is restarted. The receive timers for a 532 * stopped connection continue to advance, so receives may time out while 533 * the connection is stopped. 534 * <P> 535 * If message listeners are running when <CODE>stop</CODE> is invoked, the 536 * <CODE>stop</CODE> call must wait until all of them have returned before 537 * it may return. While these message listeners are completing, they must 538 * have the full services of the connection available to them. 539 * 540 * @throws JMSException if the JMS provider fails to stop message delivery 541 * due to some internal error. 542 * @see javax.jms.Connection#start() 543 */ 544 public void stop() throws JMSException { 545 checkClosedOrFailed(); 546 if (started.compareAndSet(true, false)) { 547 synchronized(sessions) { 548 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 549 ActiveMQSession s = i.next(); 550 s.stop(); 551 } 552 } 553 } 554 } 555 556 /** 557 * Closes the connection. 558 * <P> 559 * Since a provider typically allocates significant resources outside the 560 * JVM on behalf of a connection, clients should close these resources when 561 * they are not needed. Relying on garbage collection to eventually reclaim 562 * these resources may not be timely enough. 563 * <P> 564 * There is no need to close the sessions, producers, and consumers of a 565 * closed connection. 566 * <P> 567 * Closing a connection causes all temporary destinations to be deleted. 568 * <P> 569 * When this method is invoked, it should not return until message 570 * processing has been shut down in an orderly fashion. This means that all 571 * message listeners that may have been running have returned, and that all 572 * pending receives have returned. A close terminates all pending message 573 * receives on the connection's sessions' consumers. The receives may return 574 * with a message or with null, depending on whether there was a message 575 * available at the time of the close. If one or more of the connection's 576 * sessions' message listeners is processing a message at the time when 577 * connection <CODE>close</CODE> is invoked, all the facilities of the 578 * connection and its sessions must remain available to those listeners 579 * until they return control to the JMS provider. 580 * <P> 581 * Closing a connection causes any of its sessions' transactions in progress 582 * to be rolled back. In the case where a session's work is coordinated by 583 * an external transaction manager, a session's <CODE>commit</CODE> and 584 * <CODE> rollback</CODE> methods are not used and the result of a closed 585 * session's work is determined later by the transaction manager. Closing a 586 * connection does NOT force an acknowledgment of client-acknowledged 587 * sessions. 588 * <P> 589 * Invoking the <CODE>acknowledge</CODE> method of a received message from 590 * a closed connection's session must throw an 591 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 592 * NOT throw an exception. 593 * 594 * @throws JMSException if the JMS provider fails to close the connection 595 * due to some internal error. For example, a failure to 596 * release resources or to close a socket connection can 597 * cause this exception to be thrown. 598 */ 599 public void close() throws JMSException { 600 try { 601 // If we were running, lets stop first. 602 if (!closed.get() && !transportFailed.get()) { 603 stop(); 604 } 605 606 synchronized (this) { 607 if (!closed.get()) { 608 closing.set(true); 609 610 if (destinationSource != null) { 611 destinationSource.stop(); 612 destinationSource = null; 613 } 614 if (advisoryConsumer != null) { 615 advisoryConsumer.dispose(); 616 advisoryConsumer = null; 617 } 618 if (this.scheduler != null) { 619 try { 620 this.scheduler.stop(); 621 } catch (Exception e) { 622 JMSException ex = JMSExceptionSupport.create(e); 623 throw ex; 624 } 625 } 626 627 long lastDeliveredSequenceId = 0; 628 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 629 ActiveMQSession s = i.next(); 630 s.dispose(); 631 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 632 } 633 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 634 ActiveMQConnectionConsumer c = i.next(); 635 c.dispose(); 636 } 637 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 638 ActiveMQInputStream c = i.next(); 639 c.dispose(); 640 } 641 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 642 ActiveMQOutputStream c = i.next(); 643 c.dispose(); 644 } 645 646 // As TemporaryQueue and TemporaryTopic instances are bound 647 // to a connection we should just delete them after the connection 648 // is closed to free up memory 649 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) { 650 ActiveMQTempDestination c = i.next(); 651 c.delete(); 652 } 653 654 if (isConnectionInfoSentToBroker) { 655 // If we announced ourselfs to the broker.. Try to let 656 // the broker 657 // know that the connection is being shutdown. 658 RemoveInfo removeCommand = info.createRemoveCommand(); 659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 660 doSyncSendPacket(info.createRemoveCommand(), closeTimeout); 661 doAsyncSendPacket(new ShutdownInfo()); 662 } 663 664 ServiceSupport.dispose(this.transport); 665 666 started.set(false); 667 668 // TODO if we move the TaskRunnerFactory to the connection 669 // factory 670 // then we may need to call 671 // factory.onConnectionClose(this); 672 if (sessionTaskRunner != null) { 673 sessionTaskRunner.shutdown(); 674 } 675 closed.set(true); 676 closing.set(false); 677 } 678 } 679 } finally { 680 try { 681 if (executor != null){ 682 executor.shutdown(); 683 } 684 }catch(Throwable e) { 685 LOG.error("Error shutting down thread pool " + e,e); 686 } 687 factoryStats.removeConnection(this); 688 } 689 } 690 691 /** 692 * Tells the broker to terminate its VM. This can be used to cleanly 693 * terminate a broker running in a standalone java process. Server must have 694 * property enable.vm.shutdown=true defined to allow this to work. 695 */ 696 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 697 // implemented. 698 /* 699 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 700 * command = new BrokerAdminCommand(); 701 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 702 * asyncSendPacket(command); } 703 */ 704 705 /** 706 * Create a durable connection consumer for this connection (optional 707 * operation). This is an expert facility not used by regular JMS clients. 708 * 709 * @param topic topic to access 710 * @param subscriptionName durable subscription name 711 * @param messageSelector only messages with properties matching the message 712 * selector expression are delivered. A value of null or an 713 * empty string indicates that there is no message selector 714 * for the message consumer. 715 * @param sessionPool the server session pool to associate with this durable 716 * connection consumer 717 * @param maxMessages the maximum number of messages that can be assigned to 718 * a server session at one time 719 * @return the durable connection consumer 720 * @throws JMSException if the <CODE>Connection</CODE> object fails to 721 * create a connection consumer due to some internal error 722 * or invalid arguments for <CODE>sessionPool</CODE> and 723 * <CODE>messageSelector</CODE>. 724 * @throws javax.jms.InvalidDestinationException if an invalid destination 725 * is specified. 726 * @throws javax.jms.InvalidSelectorException if the message selector is 727 * invalid. 728 * @see javax.jms.ConnectionConsumer 729 * @since 1.1 730 */ 731 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 732 throws JMSException { 733 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 734 } 735 736 /** 737 * Create a durable connection consumer for this connection (optional 738 * operation). This is an expert facility not used by regular JMS clients. 739 * 740 * @param topic topic to access 741 * @param subscriptionName durable subscription name 742 * @param messageSelector only messages with properties matching the message 743 * selector expression are delivered. A value of null or an 744 * empty string indicates that there is no message selector 745 * for the message consumer. 746 * @param sessionPool the server session pool to associate with this durable 747 * connection consumer 748 * @param maxMessages the maximum number of messages that can be assigned to 749 * a server session at one time 750 * @param noLocal set true if you want to filter out messages published 751 * locally 752 * @return the durable connection consumer 753 * @throws JMSException if the <CODE>Connection</CODE> object fails to 754 * create a connection consumer due to some internal error 755 * or invalid arguments for <CODE>sessionPool</CODE> and 756 * <CODE>messageSelector</CODE>. 757 * @throws javax.jms.InvalidDestinationException if an invalid destination 758 * is specified. 759 * @throws javax.jms.InvalidSelectorException if the message selector is 760 * invalid. 761 * @see javax.jms.ConnectionConsumer 762 * @since 1.1 763 */ 764 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 765 boolean noLocal) throws JMSException { 766 checkClosedOrFailed(); 767 ensureConnectionInfoSent(); 768 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 769 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 770 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 771 info.setSubscriptionName(subscriptionName); 772 info.setSelector(messageSelector); 773 info.setPrefetchSize(maxMessages); 774 info.setDispatchAsync(isDispatchAsync()); 775 776 // Allows the options on the destination to configure the consumerInfo 777 if (info.getDestination().getOptions() != null) { 778 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 779 IntrospectionSupport.setProperties(this.info, options, "consumer."); 780 } 781 782 return new ActiveMQConnectionConsumer(this, sessionPool, info); 783 } 784 785 // Properties 786 // ------------------------------------------------------------------------- 787 788 /** 789 * Returns true if this connection has been started 790 * 791 * @return true if this Connection is started 792 */ 793 public boolean isStarted() { 794 return started.get(); 795 } 796 797 /** 798 * Returns true if the connection is closed 799 */ 800 public boolean isClosed() { 801 return closed.get(); 802 } 803 804 /** 805 * Returns true if the connection is in the process of being closed 806 */ 807 public boolean isClosing() { 808 return closing.get(); 809 } 810 811 /** 812 * Returns true if the underlying transport has failed 813 */ 814 public boolean isTransportFailed() { 815 return transportFailed.get(); 816 } 817 818 /** 819 * @return Returns the prefetchPolicy. 820 */ 821 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 822 return prefetchPolicy; 823 } 824 825 /** 826 * Sets the <a 827 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 828 * policy</a> for consumers created by this connection. 829 */ 830 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 831 this.prefetchPolicy = prefetchPolicy; 832 } 833 834 /** 835 */ 836 public Transport getTransportChannel() { 837 return transport; 838 } 839 840 /** 841 * @return Returns the clientID of the connection, forcing one to be 842 * generated if one has not yet been configured. 843 */ 844 public String getInitializedClientID() throws JMSException { 845 ensureConnectionInfoSent(); 846 return info.getClientId(); 847 } 848 849 /** 850 * @return Returns the timeStampsDisableByDefault. 851 */ 852 public boolean isDisableTimeStampsByDefault() { 853 return disableTimeStampsByDefault; 854 } 855 856 /** 857 * Sets whether or not timestamps on messages should be disabled or not. If 858 * you disable them it adds a small performance boost. 859 */ 860 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 861 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 862 } 863 864 /** 865 * @return Returns the dispatchOptimizedMessage. 866 */ 867 public boolean isOptimizedMessageDispatch() { 868 return optimizedMessageDispatch; 869 } 870 871 /** 872 * If this flag is set then an larger prefetch limit is used - only 873 * applicable for durable topic subscribers. 874 */ 875 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 876 this.optimizedMessageDispatch = dispatchOptimizedMessage; 877 } 878 879 /** 880 * @return Returns the closeTimeout. 881 */ 882 public int getCloseTimeout() { 883 return closeTimeout; 884 } 885 886 /** 887 * Sets the timeout before a close is considered complete. Normally a 888 * close() on a connection waits for confirmation from the broker; this 889 * allows that operation to timeout to save the client hanging if there is 890 * no broker 891 */ 892 public void setCloseTimeout(int closeTimeout) { 893 this.closeTimeout = closeTimeout; 894 } 895 896 /** 897 * @return ConnectionInfo 898 */ 899 public ConnectionInfo getConnectionInfo() { 900 return this.info; 901 } 902 903 public boolean isUseRetroactiveConsumer() { 904 return useRetroactiveConsumer; 905 } 906 907 /** 908 * Sets whether or not retroactive consumers are enabled. Retroactive 909 * consumers allow non-durable topic subscribers to receive old messages 910 * that were published before the non-durable subscriber started. 911 */ 912 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 913 this.useRetroactiveConsumer = useRetroactiveConsumer; 914 } 915 916 public boolean isNestedMapAndListEnabled() { 917 return nestedMapAndListEnabled; 918 } 919 920 /** 921 * Enables/disables whether or not Message properties and MapMessage entries 922 * support <a 923 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 924 * Structures</a> of Map and List objects 925 */ 926 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 927 this.nestedMapAndListEnabled = structuredMapsEnabled; 928 } 929 930 public boolean isExclusiveConsumer() { 931 return exclusiveConsumer; 932 } 933 934 /** 935 * Enables or disables whether or not queue consumers should be exclusive or 936 * not for example to preserve ordering when not using <a 937 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 938 * 939 * @param exclusiveConsumer 940 */ 941 public void setExclusiveConsumer(boolean exclusiveConsumer) { 942 this.exclusiveConsumer = exclusiveConsumer; 943 } 944 945 /** 946 * Adds a transport listener so that a client can be notified of events in 947 * the underlying transport 948 */ 949 public void addTransportListener(TransportListener transportListener) { 950 transportListeners.add(transportListener); 951 } 952 953 public void removeTransportListener(TransportListener transportListener) { 954 transportListeners.remove(transportListener); 955 } 956 957 public boolean isUseDedicatedTaskRunner() { 958 return useDedicatedTaskRunner; 959 } 960 961 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 962 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 963 } 964 965 public TaskRunnerFactory getSessionTaskRunner() { 966 synchronized (this) { 967 if (sessionTaskRunner == null) { 968 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); 969 } 970 } 971 return sessionTaskRunner; 972 } 973 974 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 975 this.sessionTaskRunner = sessionTaskRunner; 976 } 977 978 public MessageTransformer getTransformer() { 979 return transformer; 980 } 981 982 /** 983 * Sets the transformer used to transform messages before they are sent on 984 * to the JMS bus or when they are received from the bus but before they are 985 * delivered to the JMS client 986 */ 987 public void setTransformer(MessageTransformer transformer) { 988 this.transformer = transformer; 989 } 990 991 /** 992 * @return the statsEnabled 993 */ 994 public boolean isStatsEnabled() { 995 return this.stats.isEnabled(); 996 } 997 998 /** 999 * @param statsEnabled the statsEnabled to set 1000 */ 1001 public void setStatsEnabled(boolean statsEnabled) { 1002 this.stats.setEnabled(statsEnabled); 1003 } 1004 1005 /** 1006 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1007 * being created or destroyed or to enquire about the current destinations available on the broker 1008 * 1009 * @return a lazily created destination source 1010 * @throws JMSException 1011 */ 1012 public DestinationSource getDestinationSource() throws JMSException { 1013 if (destinationSource == null) { 1014 destinationSource = new DestinationSource(this); 1015 destinationSource.start(); 1016 } 1017 return destinationSource; 1018 } 1019 1020 // Implementation methods 1021 // ------------------------------------------------------------------------- 1022 1023 /** 1024 * Used internally for adding Sessions to the Connection 1025 * 1026 * @param session 1027 * @throws JMSException 1028 * @throws JMSException 1029 */ 1030 protected void addSession(ActiveMQSession session) throws JMSException { 1031 this.sessions.add(session); 1032 if (sessions.size() > 1 || session.isTransacted()) { 1033 optimizedMessageDispatch = false; 1034 } 1035 } 1036 1037 /** 1038 * Used interanlly for removing Sessions from a Connection 1039 * 1040 * @param session 1041 */ 1042 protected void removeSession(ActiveMQSession session) { 1043 this.sessions.remove(session); 1044 this.removeDispatcher(session); 1045 } 1046 1047 /** 1048 * Add a ConnectionConsumer 1049 * 1050 * @param connectionConsumer 1051 * @throws JMSException 1052 */ 1053 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1054 this.connectionConsumers.add(connectionConsumer); 1055 } 1056 1057 /** 1058 * Remove a ConnectionConsumer 1059 * 1060 * @param connectionConsumer 1061 */ 1062 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1063 this.connectionConsumers.remove(connectionConsumer); 1064 this.removeDispatcher(connectionConsumer); 1065 } 1066 1067 /** 1068 * Creates a <CODE>TopicSession</CODE> object. 1069 * 1070 * @param transacted indicates whether the session is transacted 1071 * @param acknowledgeMode indicates whether the consumer or the client will 1072 * acknowledge any messages it receives; ignored if the 1073 * session is transacted. Legal values are 1074 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1075 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1076 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1077 * @return a newly created topic session 1078 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1079 * to create a session due to some internal error or lack of 1080 * support for the specific transaction and acknowledgement 1081 * mode. 1082 * @see Session#AUTO_ACKNOWLEDGE 1083 * @see Session#CLIENT_ACKNOWLEDGE 1084 * @see Session#DUPS_OK_ACKNOWLEDGE 1085 */ 1086 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1087 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1088 } 1089 1090 /** 1091 * Creates a connection consumer for this connection (optional operation). 1092 * This is an expert facility not used by regular JMS clients. 1093 * 1094 * @param topic the topic to access 1095 * @param messageSelector only messages with properties matching the message 1096 * selector expression are delivered. A value of null or an 1097 * empty string indicates that there is no message selector 1098 * for the message consumer. 1099 * @param sessionPool the server session pool to associate with this 1100 * connection consumer 1101 * @param maxMessages the maximum number of messages that can be assigned to 1102 * a server session at one time 1103 * @return the connection consumer 1104 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1105 * to create a connection consumer due to some internal 1106 * error or invalid arguments for <CODE>sessionPool</CODE> 1107 * and <CODE>messageSelector</CODE>. 1108 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1109 * specified. 1110 * @throws javax.jms.InvalidSelectorException if the message selector is 1111 * invalid. 1112 * @see javax.jms.ConnectionConsumer 1113 */ 1114 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1115 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1116 } 1117 1118 /** 1119 * Creates a connection consumer for this connection (optional operation). 1120 * This is an expert facility not used by regular JMS clients. 1121 * 1122 * @param queue the queue to access 1123 * @param messageSelector only messages with properties matching the message 1124 * selector expression are delivered. A value of null or an 1125 * empty string indicates that there is no message selector 1126 * for the message consumer. 1127 * @param sessionPool the server session pool to associate with this 1128 * connection consumer 1129 * @param maxMessages the maximum number of messages that can be assigned to 1130 * a server session at one time 1131 * @return the connection consumer 1132 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1133 * to create a connection consumer due to some internal 1134 * error or invalid arguments for <CODE>sessionPool</CODE> 1135 * and <CODE>messageSelector</CODE>. 1136 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1137 * specified. 1138 * @throws javax.jms.InvalidSelectorException if the message selector is 1139 * invalid. 1140 * @see javax.jms.ConnectionConsumer 1141 */ 1142 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1143 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1144 } 1145 1146 /** 1147 * Creates a connection consumer for this connection (optional operation). 1148 * This is an expert facility not used by regular JMS clients. 1149 * 1150 * @param destination the destination to access 1151 * @param messageSelector only messages with properties matching the message 1152 * selector expression are delivered. A value of null or an 1153 * empty string indicates that there is no message selector 1154 * for the message consumer. 1155 * @param sessionPool the server session pool to associate with this 1156 * connection consumer 1157 * @param maxMessages the maximum number of messages that can be assigned to 1158 * a server session at one time 1159 * @return the connection consumer 1160 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1161 * create a connection consumer due to some internal error 1162 * or invalid arguments for <CODE>sessionPool</CODE> and 1163 * <CODE>messageSelector</CODE>. 1164 * @throws javax.jms.InvalidDestinationException if an invalid destination 1165 * is specified. 1166 * @throws javax.jms.InvalidSelectorException if the message selector is 1167 * invalid. 1168 * @see javax.jms.ConnectionConsumer 1169 * @since 1.1 1170 */ 1171 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1172 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1173 } 1174 1175 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1176 throws JMSException { 1177 1178 checkClosedOrFailed(); 1179 ensureConnectionInfoSent(); 1180 1181 ConsumerId consumerId = createConsumerId(); 1182 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1183 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1184 consumerInfo.setSelector(messageSelector); 1185 consumerInfo.setPrefetchSize(maxMessages); 1186 consumerInfo.setNoLocal(noLocal); 1187 consumerInfo.setDispatchAsync(isDispatchAsync()); 1188 1189 // Allows the options on the destination to configure the consumerInfo 1190 if (consumerInfo.getDestination().getOptions() != null) { 1191 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1192 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1193 } 1194 1195 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1196 } 1197 1198 /** 1199 * @return 1200 */ 1201 private ConsumerId createConsumerId() { 1202 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1203 } 1204 1205 /** 1206 * @return 1207 */ 1208 private ProducerId createProducerId() { 1209 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1210 } 1211 1212 /** 1213 * Creates a <CODE>QueueSession</CODE> object. 1214 * 1215 * @param transacted indicates whether the session is transacted 1216 * @param acknowledgeMode indicates whether the consumer or the client will 1217 * acknowledge any messages it receives; ignored if the 1218 * session is transacted. Legal values are 1219 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1220 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1221 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1222 * @return a newly created queue session 1223 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1224 * to create a session due to some internal error or lack of 1225 * support for the specific transaction and acknowledgement 1226 * mode. 1227 * @see Session#AUTO_ACKNOWLEDGE 1228 * @see Session#CLIENT_ACKNOWLEDGE 1229 * @see Session#DUPS_OK_ACKNOWLEDGE 1230 */ 1231 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1232 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1233 } 1234 1235 /** 1236 * Ensures that the clientID was manually specified and not auto-generated. 1237 * If the clientID was not specified this method will throw an exception. 1238 * This method is used to ensure that the clientID + durableSubscriber name 1239 * are used correctly. 1240 * 1241 * @throws JMSException 1242 */ 1243 public void checkClientIDWasManuallySpecified() throws JMSException { 1244 if (!userSpecifiedClientID) { 1245 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1246 } 1247 } 1248 1249 /** 1250 * send a Packet through the Connection - for internal use only 1251 * 1252 * @param command 1253 * @throws JMSException 1254 */ 1255 public void asyncSendPacket(Command command) throws JMSException { 1256 if (isClosed()) { 1257 throw new ConnectionClosedException(); 1258 } else { 1259 doAsyncSendPacket(command); 1260 } 1261 } 1262 1263 private void doAsyncSendPacket(Command command) throws JMSException { 1264 try { 1265 this.transport.oneway(command); 1266 } catch (IOException e) { 1267 throw JMSExceptionSupport.create(e); 1268 } 1269 } 1270 1271 /** 1272 * Send a packet through a Connection - for internal use only 1273 * 1274 * @param command 1275 * @return 1276 * @throws JMSException 1277 */ 1278 public Response syncSendPacket(Command command) throws JMSException { 1279 if (isClosed()) { 1280 throw new ConnectionClosedException(); 1281 } else { 1282 1283 try { 1284 Response response = (Response)this.transport.request(command); 1285 if (response.isException()) { 1286 ExceptionResponse er = (ExceptionResponse)response; 1287 if (er.getException() instanceof JMSException) { 1288 throw (JMSException)er.getException(); 1289 } else { 1290 if (isClosed()||closing.get()) { 1291 LOG.debug("Received an exception but connection is closing"); 1292 } 1293 JMSException jmsEx = null; 1294 try { 1295 jmsEx = JMSExceptionSupport.create(er.getException()); 1296 }catch(Throwable e) { 1297 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1298 } 1299 if(jmsEx !=null) { 1300 throw jmsEx; 1301 } 1302 } 1303 } 1304 return response; 1305 } catch (IOException e) { 1306 throw JMSExceptionSupport.create(e); 1307 } 1308 } 1309 } 1310 1311 /** 1312 * Send a packet through a Connection - for internal use only 1313 * 1314 * @param command 1315 * @return 1316 * @throws JMSException 1317 */ 1318 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1319 if (isClosed() || closing.get()) { 1320 throw new ConnectionClosedException(); 1321 } else { 1322 return doSyncSendPacket(command, timeout); 1323 } 1324 } 1325 1326 private Response doSyncSendPacket(Command command, int timeout) 1327 throws JMSException { 1328 try { 1329 Response response = (Response) (timeout > 0 1330 ? this.transport.request(command, timeout) 1331 : this.transport.request(command)); 1332 if (response != null && response.isException()) { 1333 ExceptionResponse er = (ExceptionResponse)response; 1334 if (er.getException() instanceof JMSException) { 1335 throw (JMSException)er.getException(); 1336 } else { 1337 throw JMSExceptionSupport.create(er.getException()); 1338 } 1339 } 1340 return response; 1341 } catch (IOException e) { 1342 throw JMSExceptionSupport.create(e); 1343 } 1344 } 1345 1346 /** 1347 * @return statistics for this Connection 1348 */ 1349 public StatsImpl getStats() { 1350 return stats; 1351 } 1352 1353 /** 1354 * simply throws an exception if the Connection is already closed or the 1355 * Transport has failed 1356 * 1357 * @throws JMSException 1358 */ 1359 protected synchronized void checkClosedOrFailed() throws JMSException { 1360 checkClosed(); 1361 if (transportFailed.get()) { 1362 throw new ConnectionFailedException(firstFailureError); 1363 } 1364 } 1365 1366 /** 1367 * simply throws an exception if the Connection is already closed 1368 * 1369 * @throws JMSException 1370 */ 1371 protected synchronized void checkClosed() throws JMSException { 1372 if (closed.get()) { 1373 throw new ConnectionClosedException(); 1374 } 1375 } 1376 1377 /** 1378 * Send the ConnectionInfo to the Broker 1379 * 1380 * @throws JMSException 1381 */ 1382 protected void ensureConnectionInfoSent() throws JMSException { 1383 synchronized(this.ensureConnectionInfoSentMutex) { 1384 // Can we skip sending the ConnectionInfo packet?? 1385 if (isConnectionInfoSentToBroker || closed.get()) { 1386 return; 1387 } 1388 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1389 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1390 info.setClientId(clientIdGenerator.generateId()); 1391 } 1392 syncSendPacket(info.copy()); 1393 1394 this.isConnectionInfoSentToBroker = true; 1395 // Add a temp destination advisory consumer so that 1396 // We know what the valid temporary destinations are on the 1397 // broker without having to do an RPC to the broker. 1398 1399 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1400 if (watchTopicAdvisories) { 1401 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1402 } 1403 } 1404 } 1405 1406 public synchronized boolean isWatchTopicAdvisories() { 1407 return watchTopicAdvisories; 1408 } 1409 1410 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1411 this.watchTopicAdvisories = watchTopicAdvisories; 1412 } 1413 1414 /** 1415 * @return Returns the useAsyncSend. 1416 */ 1417 public boolean isUseAsyncSend() { 1418 return useAsyncSend; 1419 } 1420 1421 /** 1422 * Forces the use of <a 1423 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1424 * adds a massive performance boost; but means that the send() method will 1425 * return immediately whether the message has been sent or not which could 1426 * lead to message loss. 1427 */ 1428 public void setUseAsyncSend(boolean useAsyncSend) { 1429 this.useAsyncSend = useAsyncSend; 1430 } 1431 1432 /** 1433 * @return true if always sync send messages 1434 */ 1435 public boolean isAlwaysSyncSend() { 1436 return this.alwaysSyncSend; 1437 } 1438 1439 /** 1440 * Set true if always require messages to be sync sent 1441 * 1442 * @param alwaysSyncSend 1443 */ 1444 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1445 this.alwaysSyncSend = alwaysSyncSend; 1446 } 1447 1448 /** 1449 * @return the messagePrioritySupported 1450 */ 1451 public boolean isMessagePrioritySupported() { 1452 return this.messagePrioritySupported; 1453 } 1454 1455 /** 1456 * @param messagePrioritySupported the messagePrioritySupported to set 1457 */ 1458 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1459 this.messagePrioritySupported = messagePrioritySupported; 1460 } 1461 1462 /** 1463 * Cleans up this connection so that it's state is as if the connection was 1464 * just created. This allows the Resource Adapter to clean up a connection 1465 * so that it can be reused without having to close and recreate the 1466 * connection. 1467 */ 1468 public void cleanup() throws JMSException { 1469 1470 if (advisoryConsumer != null && !isTransportFailed()) { 1471 advisoryConsumer.dispose(); 1472 advisoryConsumer = null; 1473 } 1474 1475 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1476 ActiveMQSession s = i.next(); 1477 s.dispose(); 1478 } 1479 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1480 ActiveMQConnectionConsumer c = i.next(); 1481 c.dispose(); 1482 } 1483 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 1484 ActiveMQInputStream c = i.next(); 1485 c.dispose(); 1486 } 1487 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 1488 ActiveMQOutputStream c = i.next(); 1489 c.dispose(); 1490 } 1491 1492 if (isConnectionInfoSentToBroker) { 1493 if (!transportFailed.get() && !closing.get()) { 1494 syncSendPacket(info.createRemoveCommand()); 1495 } 1496 isConnectionInfoSentToBroker = false; 1497 } 1498 if (userSpecifiedClientID) { 1499 info.setClientId(null); 1500 userSpecifiedClientID = false; 1501 } 1502 clientIDSet = false; 1503 1504 started.set(false); 1505 } 1506 1507 /** 1508 * Changes the associated username/password that is associated with this 1509 * connection. If the connection has been used, you must called cleanup() 1510 * before calling this method. 1511 * 1512 * @throws IllegalStateException if the connection is in used. 1513 */ 1514 public void changeUserInfo(String userName, String password) throws JMSException { 1515 if (isConnectionInfoSentToBroker) { 1516 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1517 } 1518 this.info.setUserName(userName); 1519 this.info.setPassword(password); 1520 } 1521 1522 /** 1523 * @return Returns the resourceManagerId. 1524 * @throws JMSException 1525 */ 1526 public String getResourceManagerId() throws JMSException { 1527 waitForBrokerInfo(); 1528 if (brokerInfo == null) { 1529 throw new JMSException("Connection failed before Broker info was received."); 1530 } 1531 return brokerInfo.getBrokerId().getValue(); 1532 } 1533 1534 /** 1535 * Returns the broker name if one is available or null if one is not 1536 * available yet. 1537 */ 1538 public String getBrokerName() { 1539 try { 1540 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1541 if (brokerInfo == null) { 1542 return null; 1543 } 1544 return brokerInfo.getBrokerName(); 1545 } catch (InterruptedException e) { 1546 Thread.currentThread().interrupt(); 1547 return null; 1548 } 1549 } 1550 1551 /** 1552 * Returns the broker information if it is available or null if it is not 1553 * available yet. 1554 */ 1555 public BrokerInfo getBrokerInfo() { 1556 return brokerInfo; 1557 } 1558 1559 /** 1560 * @return Returns the RedeliveryPolicy. 1561 * @throws JMSException 1562 */ 1563 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1564 return redeliveryPolicy; 1565 } 1566 1567 /** 1568 * Sets the redelivery policy to be used when messages are rolled back 1569 */ 1570 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1571 this.redeliveryPolicy = redeliveryPolicy; 1572 } 1573 1574 public BlobTransferPolicy getBlobTransferPolicy() { 1575 if (blobTransferPolicy == null) { 1576 blobTransferPolicy = createBlobTransferPolicy(); 1577 } 1578 return blobTransferPolicy; 1579 } 1580 1581 /** 1582 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1583 * OBjects) are transferred from producers to brokers to consumers 1584 */ 1585 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1586 this.blobTransferPolicy = blobTransferPolicy; 1587 } 1588 1589 /** 1590 * @return Returns the alwaysSessionAsync. 1591 */ 1592 public boolean isAlwaysSessionAsync() { 1593 return alwaysSessionAsync; 1594 } 1595 1596 /** 1597 * If this flag is set then a separate thread is not used for dispatching 1598 * messages for each Session in the Connection. However, a separate thread 1599 * is always used if there is more than one session, or the session isn't in 1600 * auto acknowledge or duplicates ok mode 1601 */ 1602 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1603 this.alwaysSessionAsync = alwaysSessionAsync; 1604 } 1605 1606 /** 1607 * @return Returns the optimizeAcknowledge. 1608 */ 1609 public boolean isOptimizeAcknowledge() { 1610 return optimizeAcknowledge; 1611 } 1612 1613 /** 1614 * Enables an optimised acknowledgement mode where messages are acknowledged 1615 * in batches rather than individually 1616 * 1617 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1618 */ 1619 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1620 this.optimizeAcknowledge = optimizeAcknowledge; 1621 } 1622 1623 public long getWarnAboutUnstartedConnectionTimeout() { 1624 return warnAboutUnstartedConnectionTimeout; 1625 } 1626 1627 /** 1628 * Enables the timeout from a connection creation to when a warning is 1629 * generated if the connection is not properly started via {@link #start()} 1630 * and a message is received by a consumer. It is a very common gotcha to 1631 * forget to <a 1632 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1633 * the connection</a> so this option makes the default case to create a 1634 * warning if the user forgets. To disable the warning just set the value to < 1635 * 0 (say -1). 1636 */ 1637 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1638 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1639 } 1640 1641 /** 1642 * @return the sendTimeout 1643 */ 1644 public int getSendTimeout() { 1645 return sendTimeout; 1646 } 1647 1648 /** 1649 * @param sendTimeout the sendTimeout to set 1650 */ 1651 public void setSendTimeout(int sendTimeout) { 1652 this.sendTimeout = sendTimeout; 1653 } 1654 1655 /** 1656 * @return the sendAcksAsync 1657 */ 1658 public boolean isSendAcksAsync() { 1659 return sendAcksAsync; 1660 } 1661 1662 /** 1663 * @param sendAcksAsync the sendAcksAsync to set 1664 */ 1665 public void setSendAcksAsync(boolean sendAcksAsync) { 1666 this.sendAcksAsync = sendAcksAsync; 1667 } 1668 1669 1670 /** 1671 * Returns the time this connection was created 1672 */ 1673 public long getTimeCreated() { 1674 return timeCreated; 1675 } 1676 1677 private void waitForBrokerInfo() throws JMSException { 1678 try { 1679 brokerInfoReceived.await(); 1680 } catch (InterruptedException e) { 1681 Thread.currentThread().interrupt(); 1682 throw JMSExceptionSupport.create(e); 1683 } 1684 } 1685 1686 // Package protected so that it can be used in unit tests 1687 public Transport getTransport() { 1688 return transport; 1689 } 1690 1691 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1692 producers.put(producerId, producer); 1693 } 1694 1695 public void removeProducer(ProducerId producerId) { 1696 producers.remove(producerId); 1697 } 1698 1699 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1700 dispatchers.put(consumerId, dispatcher); 1701 } 1702 1703 public void removeDispatcher(ConsumerId consumerId) { 1704 dispatchers.remove(consumerId); 1705 } 1706 1707 /** 1708 * @param o - the command to consume 1709 */ 1710 public void onCommand(final Object o) { 1711 final Command command = (Command)o; 1712 if (!closed.get() && command != null) { 1713 try { 1714 command.visit(new CommandVisitorAdapter() { 1715 @Override 1716 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1717 waitForTransportInterruptionProcessingToComplete(); 1718 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1719 if (dispatcher != null) { 1720 // Copy in case a embedded broker is dispatching via 1721 // vm:// 1722 // md.getMessage() == null to signal end of queue 1723 // browse. 1724 Message msg = md.getMessage(); 1725 if (msg != null) { 1726 msg = msg.copy(); 1727 msg.setReadOnlyBody(true); 1728 msg.setReadOnlyProperties(true); 1729 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1730 msg.setConnection(ActiveMQConnection.this); 1731 md.setMessage(msg); 1732 } 1733 dispatcher.dispatch(md); 1734 } 1735 return null; 1736 } 1737 1738 @Override 1739 public Response processProducerAck(ProducerAck pa) throws Exception { 1740 if (pa != null && pa.getProducerId() != null) { 1741 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1742 if (producer != null) { 1743 producer.onProducerAck(pa); 1744 } 1745 } 1746 return null; 1747 } 1748 1749 @Override 1750 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1751 brokerInfo = info; 1752 brokerInfoReceived.countDown(); 1753 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1754 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1755 return null; 1756 } 1757 1758 @Override 1759 public Response processConnectionError(final ConnectionError error) throws Exception { 1760 executor.execute(new Runnable() { 1761 public void run() { 1762 onAsyncException(error.getException()); 1763 } 1764 }); 1765 return null; 1766 } 1767 1768 @Override 1769 public Response processControlCommand(ControlCommand command) throws Exception { 1770 onControlCommand(command); 1771 return null; 1772 } 1773 1774 @Override 1775 public Response processConnectionControl(ConnectionControl control) throws Exception { 1776 onConnectionControl((ConnectionControl)command); 1777 return null; 1778 } 1779 1780 @Override 1781 public Response processConsumerControl(ConsumerControl control) throws Exception { 1782 onConsumerControl((ConsumerControl)command); 1783 return null; 1784 } 1785 1786 @Override 1787 public Response processWireFormat(WireFormatInfo info) throws Exception { 1788 onWireFormatInfo((WireFormatInfo)command); 1789 return null; 1790 } 1791 }); 1792 } catch (Exception e) { 1793 onClientInternalException(e); 1794 } 1795 1796 } 1797 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1798 TransportListener listener = iter.next(); 1799 listener.onCommand(command); 1800 } 1801 } 1802 1803 protected void onWireFormatInfo(WireFormatInfo info) { 1804 protocolVersion.set(info.getVersion()); 1805 } 1806 1807 /** 1808 * Handles async client internal exceptions. 1809 * A client internal exception is usually one that has been thrown 1810 * by a container runtime component during asynchronous processing of a 1811 * message that does not affect the connection itself. 1812 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1813 * its <code>onException</code> method, if one has been registered with this connection. 1814 * 1815 * @param error the exception that the problem 1816 */ 1817 public void onClientInternalException(final Throwable error) { 1818 if ( !closed.get() && !closing.get() ) { 1819 if ( this.clientInternalExceptionListener != null ) { 1820 executor.execute(new Runnable() { 1821 public void run() { 1822 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1823 } 1824 }); 1825 } else { 1826 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1827 + error, error); 1828 } 1829 } 1830 } 1831 /** 1832 * Used for handling async exceptions 1833 * 1834 * @param error 1835 */ 1836 public void onAsyncException(Throwable error) { 1837 if (!closed.get() && !closing.get()) { 1838 if (this.exceptionListener != null) { 1839 1840 if (!(error instanceof JMSException)) { 1841 error = JMSExceptionSupport.create(error); 1842 } 1843 final JMSException e = (JMSException)error; 1844 1845 executor.execute(new Runnable() { 1846 public void run() { 1847 ActiveMQConnection.this.exceptionListener.onException(e); 1848 } 1849 }); 1850 1851 } else { 1852 LOG.debug("Async exception with no exception listener: " + error, error); 1853 } 1854 } 1855 } 1856 1857 public void onException(final IOException error) { 1858 onAsyncException(error); 1859 if (!closing.get() && !closed.get()) { 1860 executor.execute(new Runnable() { 1861 public void run() { 1862 transportFailed(error); 1863 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1864 brokerInfoReceived.countDown(); 1865 try { 1866 cleanup(); 1867 } catch (JMSException e) { 1868 LOG.warn("Exception during connection cleanup, " + e, e); 1869 } 1870 for (Iterator<TransportListener> iter = transportListeners 1871 .iterator(); iter.hasNext();) { 1872 TransportListener listener = iter.next(); 1873 listener.onException(error); 1874 } 1875 } 1876 }); 1877 } 1878 } 1879 1880 public void transportInterupted() { 1881 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); 1882 if (LOG.isDebugEnabled()) { 1883 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); 1884 } 1885 signalInterruptionProcessingNeeded(); 1886 1887 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1888 ActiveMQSession s = i.next(); 1889 s.clearMessagesInProgress(); 1890 } 1891 1892 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 1893 connectionConsumer.clearMessagesInProgress(); 1894 } 1895 1896 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1897 TransportListener listener = iter.next(); 1898 listener.transportInterupted(); 1899 } 1900 } 1901 1902 public void transportResumed() { 1903 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1904 TransportListener listener = iter.next(); 1905 listener.transportResumed(); 1906 } 1907 } 1908 1909 /** 1910 * Create the DestinationInfo object for the temporary destination. 1911 * 1912 * @param topic - if its true topic, else queue. 1913 * @return DestinationInfo 1914 * @throws JMSException 1915 */ 1916 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 1917 1918 // Check if Destination info is of temporary type. 1919 ActiveMQTempDestination dest; 1920 if (topic) { 1921 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1922 } else { 1923 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1924 } 1925 1926 DestinationInfo info = new DestinationInfo(); 1927 info.setConnectionId(this.info.getConnectionId()); 1928 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 1929 info.setDestination(dest); 1930 syncSendPacket(info); 1931 1932 dest.setConnection(this); 1933 activeTempDestinations.put(dest, dest); 1934 return dest; 1935 } 1936 1937 /** 1938 * @param destination 1939 * @throws JMSException 1940 */ 1941 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 1942 1943 checkClosedOrFailed(); 1944 1945 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1946 ActiveMQSession s = i.next(); 1947 if (s.isInUse(destination)) { 1948 throw new JMSException("A consumer is consuming from the temporary destination"); 1949 } 1950 } 1951 1952 activeTempDestinations.remove(destination); 1953 1954 DestinationInfo destInfo = new DestinationInfo(); 1955 destInfo.setConnectionId(this.info.getConnectionId()); 1956 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1957 destInfo.setDestination(destination); 1958 destInfo.setTimeout(0); 1959 syncSendPacket(destInfo); 1960 } 1961 1962 public boolean isDeleted(ActiveMQDestination dest) { 1963 1964 // If we are not watching the advisories.. then 1965 // we will assume that the temp destination does exist. 1966 if (advisoryConsumer == null) { 1967 return false; 1968 } 1969 1970 return !activeTempDestinations.contains(dest); 1971 } 1972 1973 public boolean isCopyMessageOnSend() { 1974 return copyMessageOnSend; 1975 } 1976 1977 public LongSequenceGenerator getLocalTransactionIdGenerator() { 1978 return localTransactionIdGenerator; 1979 } 1980 1981 public boolean isUseCompression() { 1982 return useCompression; 1983 } 1984 1985 /** 1986 * Enables the use of compression of the message bodies 1987 */ 1988 public void setUseCompression(boolean useCompression) { 1989 this.useCompression = useCompression; 1990 } 1991 1992 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 1993 1994 checkClosedOrFailed(); 1995 ensureConnectionInfoSent(); 1996 1997 DestinationInfo info = new DestinationInfo(); 1998 info.setConnectionId(this.info.getConnectionId()); 1999 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2000 info.setDestination(destination); 2001 info.setTimeout(0); 2002 syncSendPacket(info); 2003 2004 } 2005 2006 public boolean isDispatchAsync() { 2007 return dispatchAsync; 2008 } 2009 2010 /** 2011 * Enables or disables the default setting of whether or not consumers have 2012 * their messages <a 2013 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2014 * synchronously or asynchronously by the broker</a>. For non-durable 2015 * topics for example we typically dispatch synchronously by default to 2016 * minimize context switches which boost performance. However sometimes its 2017 * better to go slower to ensure that a single blocked consumer socket does 2018 * not block delivery to other consumers. 2019 * 2020 * @param asyncDispatch If true then consumers created on this connection 2021 * will default to having their messages dispatched 2022 * asynchronously. The default value is false. 2023 */ 2024 public void setDispatchAsync(boolean asyncDispatch) { 2025 this.dispatchAsync = asyncDispatch; 2026 } 2027 2028 public boolean isObjectMessageSerializationDefered() { 2029 return objectMessageSerializationDefered; 2030 } 2031 2032 /** 2033 * When an object is set on an ObjectMessage, the JMS spec requires the 2034 * object to be serialized by that set method. Enabling this flag causes the 2035 * object to not get serialized. The object may subsequently get serialized 2036 * if the message needs to be sent over a socket or stored to disk. 2037 */ 2038 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2039 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2040 } 2041 2042 public InputStream createInputStream(Destination dest) throws JMSException { 2043 return createInputStream(dest, null); 2044 } 2045 2046 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 2047 return createInputStream(dest, messageSelector, false); 2048 } 2049 2050 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 2051 return createInputStream(dest, messageSelector, noLocal, -1); 2052 } 2053 2054 2055 2056 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2057 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); 2058 } 2059 2060 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 2061 return createInputStream(dest, null, false); 2062 } 2063 2064 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 2065 return createDurableInputStream(dest, name, messageSelector, false); 2066 } 2067 2068 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 2069 return createDurableInputStream(dest, name, messageSelector, noLocal, -1); 2070 } 2071 2072 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2073 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); 2074 } 2075 2076 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { 2077 checkClosedOrFailed(); 2078 ensureConnectionInfoSent(); 2079 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); 2080 } 2081 2082 /** 2083 * Creates a persistent output stream; individual messages will be written 2084 * to disk/database by the broker 2085 */ 2086 public OutputStream createOutputStream(Destination dest) throws JMSException { 2087 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2088 } 2089 2090 /** 2091 * Creates a non persistent output stream; messages will not be written to 2092 * disk 2093 */ 2094 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 2095 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2096 } 2097 2098 /** 2099 * Creates an output stream allowing full control over the delivery mode, 2100 * the priority and time to live of the messages and the properties added to 2101 * messages on the stream. 2102 * 2103 * @param streamProperties defines a map of key-value pairs where the keys 2104 * are strings and the values are primitive values (numbers 2105 * and strings) which are appended to the messages similarly 2106 * to using the 2107 * {@link javax.jms.Message#setObjectProperty(String, Object)} 2108 * method 2109 */ 2110 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 2111 checkClosedOrFailed(); 2112 ensureConnectionInfoSent(); 2113 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 2114 } 2115 2116 /** 2117 * Unsubscribes a durable subscription that has been created by a client. 2118 * <P> 2119 * This method deletes the state being maintained on behalf of the 2120 * subscriber by its provider. 2121 * <P> 2122 * It is erroneous for a client to delete a durable subscription while there 2123 * is an active <CODE>MessageConsumer </CODE> or 2124 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2125 * message is part of a pending transaction or has not been acknowledged in 2126 * the session. 2127 * 2128 * @param name the name used to identify this subscription 2129 * @throws JMSException if the session fails to unsubscribe to the durable 2130 * subscription due to some internal error. 2131 * @throws InvalidDestinationException if an invalid subscription name is 2132 * specified. 2133 * @since 1.1 2134 */ 2135 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2136 checkClosedOrFailed(); 2137 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2138 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2139 rsi.setSubscriptionName(name); 2140 rsi.setClientId(getConnectionInfo().getClientId()); 2141 syncSendPacket(rsi); 2142 } 2143 2144 /** 2145 * Internal send method optimized: - It does not copy the message - It can 2146 * only handle ActiveMQ messages. - You can specify if the send is async or 2147 * sync - Does not allow you to send /w a transaction. 2148 */ 2149 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2150 checkClosedOrFailed(); 2151 2152 if (destination.isTemporary() && isDeleted(destination)) { 2153 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2154 } 2155 2156 msg.setJMSDestination(destination); 2157 msg.setJMSDeliveryMode(deliveryMode); 2158 long expiration = 0L; 2159 2160 if (!isDisableTimeStampsByDefault()) { 2161 long timeStamp = System.currentTimeMillis(); 2162 msg.setJMSTimestamp(timeStamp); 2163 if (timeToLive > 0) { 2164 expiration = timeToLive + timeStamp; 2165 } 2166 } 2167 2168 msg.setJMSExpiration(expiration); 2169 msg.setJMSPriority(priority); 2170 2171 msg.setJMSRedelivered(false); 2172 msg.setMessageId(messageId); 2173 2174 msg.onSend(); 2175 2176 msg.setProducerId(msg.getMessageId().getProducerId()); 2177 2178 if (LOG.isDebugEnabled()) { 2179 LOG.debug("Sending message: " + msg); 2180 } 2181 2182 if (async) { 2183 asyncSendPacket(msg); 2184 } else { 2185 syncSendPacket(msg); 2186 } 2187 2188 } 2189 2190 public void addOutputStream(ActiveMQOutputStream stream) { 2191 outputStreams.add(stream); 2192 } 2193 2194 public void removeOutputStream(ActiveMQOutputStream stream) { 2195 outputStreams.remove(stream); 2196 } 2197 2198 public void addInputStream(ActiveMQInputStream stream) { 2199 inputStreams.add(stream); 2200 } 2201 2202 public void removeInputStream(ActiveMQInputStream stream) { 2203 inputStreams.remove(stream); 2204 } 2205 2206 protected void onControlCommand(ControlCommand command) { 2207 String text = command.getCommand(); 2208 if (text != null) { 2209 if (text.equals("shutdown")) { 2210 LOG.info("JVM told to shutdown"); 2211 System.exit(0); 2212 } 2213 } 2214 } 2215 2216 protected void onConnectionControl(ConnectionControl command) { 2217 if (command.isFaultTolerant()) { 2218 this.optimizeAcknowledge = false; 2219 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2220 ActiveMQSession s = i.next(); 2221 s.setOptimizeAcknowledge(false); 2222 } 2223 } 2224 } 2225 2226 protected void onConsumerControl(ConsumerControl command) { 2227 if (command.isClose()) { 2228 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2229 ActiveMQSession s = i.next(); 2230 s.close(command.getConsumerId()); 2231 } 2232 } else { 2233 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2234 ActiveMQSession s = i.next(); 2235 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2236 } 2237 } 2238 } 2239 2240 protected void transportFailed(IOException error) { 2241 transportFailed.set(true); 2242 if (firstFailureError == null) { 2243 firstFailureError = error; 2244 } 2245 } 2246 2247 /** 2248 * Should a JMS message be copied to a new JMS Message object as part of the 2249 * send() method in JMS. This is enabled by default to be compliant with the 2250 * JMS specification. You can disable it if you do not mutate JMS messages 2251 * after they are sent for a performance boost 2252 */ 2253 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2254 this.copyMessageOnSend = copyMessageOnSend; 2255 } 2256 2257 @Override 2258 public String toString() { 2259 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2260 } 2261 2262 protected BlobTransferPolicy createBlobTransferPolicy() { 2263 return new BlobTransferPolicy(); 2264 } 2265 2266 public int getProtocolVersion() { 2267 return protocolVersion.get(); 2268 } 2269 2270 public int getProducerWindowSize() { 2271 return producerWindowSize; 2272 } 2273 2274 public void setProducerWindowSize(int producerWindowSize) { 2275 this.producerWindowSize = producerWindowSize; 2276 } 2277 2278 public void setAuditDepth(int auditDepth) { 2279 connectionAudit.setAuditDepth(auditDepth); 2280 } 2281 2282 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2283 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2284 } 2285 2286 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2287 connectionAudit.removeDispatcher(dispatcher); 2288 } 2289 2290 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2291 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2292 } 2293 2294 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2295 connectionAudit.rollbackDuplicate(dispatcher, message); 2296 } 2297 2298 public IOException getFirstFailureError() { 2299 return firstFailureError; 2300 } 2301 2302 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2303 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2304 if (cdl != null) { 2305 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) { 2306 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete.."); 2307 cdl.await(10, TimeUnit.SECONDS); 2308 } 2309 signalInterruptionProcessingComplete(); 2310 } 2311 } 2312 2313 protected void transportInterruptionProcessingComplete() { 2314 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2315 if (cdl != null) { 2316 cdl.countDown(); 2317 try { 2318 signalInterruptionProcessingComplete(); 2319 } catch (InterruptedException ignored) {} 2320 } 2321 } 2322 2323 private void signalInterruptionProcessingComplete() throws InterruptedException { 2324 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2325 if (cdl.getCount()==0) { 2326 if (LOG.isDebugEnabled()) { 2327 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId()); 2328 } 2329 this.transportInterruptionProcessingComplete = null; 2330 2331 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2332 if (failoverTransport != null) { 2333 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2334 if (LOG.isDebugEnabled()) { 2335 LOG.debug("notified failover transport (" + failoverTransport 2336 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2337 } 2338 } 2339 2340 } 2341 } 2342 2343 private void signalInterruptionProcessingNeeded() { 2344 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2345 if (failoverTransport != null) { 2346 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2347 if (LOG.isDebugEnabled()) { 2348 LOG.debug("notified failover transport (" + failoverTransport 2349 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2350 } 2351 } 2352 } 2353 2354 /* 2355 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2356 * will wait to receive re dispatched messages. 2357 * default value is 0 so there is no wait by default. 2358 */ 2359 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2360 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2361 } 2362 2363 public long getConsumerFailoverRedeliveryWaitPeriod() { 2364 return consumerFailoverRedeliveryWaitPeriod; 2365 } 2366 2367 protected Scheduler getScheduler() { 2368 return this.scheduler; 2369 } 2370 2371 protected ThreadPoolExecutor getExecutor() { 2372 return this.executor; 2373 } 2374 2375 /** 2376 * @return the checkForDuplicates 2377 */ 2378 public boolean isCheckForDuplicates() { 2379 return this.checkForDuplicates; 2380 } 2381 2382 /** 2383 * @param checkForDuplicates the checkForDuplicates to set 2384 */ 2385 public void setCheckForDuplicates(boolean checkForDuplicates) { 2386 this.checkForDuplicates = checkForDuplicates; 2387 } 2388 2389 }