001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Properties; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicInteger; 033 import java.util.concurrent.atomic.AtomicReference; 034 import java.util.concurrent.locks.ReentrantReadWriteLock; 035 036 import javax.management.ObjectName; 037 import javax.transaction.xa.XAResource; 038 039 import org.apache.activemq.broker.ft.MasterBroker; 040 import org.apache.activemq.broker.region.ConnectionStatistics; 041 import org.apache.activemq.broker.region.RegionBroker; 042 import org.apache.activemq.command.BrokerId; 043 import org.apache.activemq.command.BrokerInfo; 044 import org.apache.activemq.command.Command; 045 import org.apache.activemq.command.CommandTypes; 046 import org.apache.activemq.command.ConnectionControl; 047 import org.apache.activemq.command.ConnectionError; 048 import org.apache.activemq.command.ConnectionId; 049 import org.apache.activemq.command.ConnectionInfo; 050 import org.apache.activemq.command.ConsumerControl; 051 import org.apache.activemq.command.ConsumerId; 052 import org.apache.activemq.command.ConsumerInfo; 053 import org.apache.activemq.command.ControlCommand; 054 import org.apache.activemq.command.DataArrayResponse; 055 import org.apache.activemq.command.DestinationInfo; 056 import org.apache.activemq.command.ExceptionResponse; 057 import org.apache.activemq.command.FlushCommand; 058 import org.apache.activemq.command.IntegerResponse; 059 import org.apache.activemq.command.KeepAliveInfo; 060 import org.apache.activemq.command.Message; 061 import org.apache.activemq.command.MessageAck; 062 import org.apache.activemq.command.MessageDispatch; 063 import org.apache.activemq.command.MessageDispatchNotification; 064 import org.apache.activemq.command.MessagePull; 065 import org.apache.activemq.command.ProducerAck; 066 import org.apache.activemq.command.ProducerId; 067 import org.apache.activemq.command.ProducerInfo; 068 import org.apache.activemq.command.RemoveSubscriptionInfo; 069 import org.apache.activemq.command.Response; 070 import org.apache.activemq.command.SessionId; 071 import org.apache.activemq.command.SessionInfo; 072 import org.apache.activemq.command.ShutdownInfo; 073 import org.apache.activemq.command.TransactionId; 074 import org.apache.activemq.command.TransactionInfo; 075 import org.apache.activemq.command.WireFormatInfo; 076 import org.apache.activemq.network.*; 077 import org.apache.activemq.security.MessageAuthorizationPolicy; 078 import org.apache.activemq.state.CommandVisitor; 079 import org.apache.activemq.state.ConnectionState; 080 import org.apache.activemq.state.ConsumerState; 081 import org.apache.activemq.state.ProducerState; 082 import org.apache.activemq.state.SessionState; 083 import org.apache.activemq.state.TransactionState; 084 import org.apache.activemq.thread.DefaultThreadPools; 085 import org.apache.activemq.thread.Task; 086 import org.apache.activemq.thread.TaskRunner; 087 import org.apache.activemq.thread.TaskRunnerFactory; 088 import org.apache.activemq.transaction.Transaction; 089 import org.apache.activemq.transport.DefaultTransportListener; 090 import org.apache.activemq.transport.ResponseCorrelator; 091 import org.apache.activemq.transport.Transport; 092 import org.apache.activemq.transport.TransportDisposedIOException; 093 import org.apache.activemq.transport.TransportFactory; 094 import org.apache.activemq.util.*; 095 import org.slf4j.Logger; 096 import org.slf4j.LoggerFactory; 097 import org.slf4j.MDC; 098 099 public class TransportConnection implements Connection, Task, CommandVisitor { 100 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 101 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 102 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 103 // Keeps track of the broker and connector that created this connection. 104 protected final Broker broker; 105 protected final TransportConnector connector; 106 // Keeps track of the state of the connections. 107 // protected final ConcurrentHashMap localConnectionStates=new 108 // ConcurrentHashMap(); 109 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 110 // The broker and wireformat info that was exchanged. 111 protected BrokerInfo brokerInfo; 112 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 113 protected TaskRunner taskRunner; 114 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); 115 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 116 private MasterBroker masterBroker; 117 private final Transport transport; 118 private MessageAuthorizationPolicy messageAuthorizationPolicy; 119 private WireFormatInfo wireFormatInfo; 120 // Used to do async dispatch.. this should perhaps be pushed down into the 121 // transport layer.. 122 private boolean inServiceException; 123 private final ConnectionStatistics statistics = new ConnectionStatistics(); 124 private boolean manageable; 125 private boolean slow; 126 private boolean markedCandidate; 127 private boolean blockedCandidate; 128 private boolean blocked; 129 private boolean connected; 130 private boolean active; 131 private boolean starting; 132 private boolean pendingStop; 133 private long timeStamp; 134 private final AtomicBoolean stopping = new AtomicBoolean(false); 135 private final CountDownLatch stopped = new CountDownLatch(1); 136 private final AtomicBoolean asyncException = new AtomicBoolean(false); 137 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 138 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 139 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 140 private ConnectionContext context; 141 private boolean networkConnection; 142 private boolean faultTolerantConnection; 143 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 144 private DemandForwardingBridge duplexBridge; 145 private final TaskRunnerFactory taskRunnerFactory; 146 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 147 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 148 private String duplexNetworkConnectorId; 149 150 /** 151 * @param connector 152 * @param transport 153 * @param broker 154 * @param taskRunnerFactory 155 * - can be null if you want direct dispatch to the transport 156 * else commands are sent async. 157 */ 158 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 159 TaskRunnerFactory taskRunnerFactory) { 160 this.connector = connector; 161 this.broker = broker; 162 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 163 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 164 brokerConnectionStates = rb.getConnectionStates(); 165 if (connector != null) { 166 this.statistics.setParent(connector.getStatistics()); 167 } 168 this.taskRunnerFactory = taskRunnerFactory; 169 this.transport = transport; 170 this.transport.setTransportListener(new DefaultTransportListener() { 171 @Override 172 public void onCommand(Object o) { 173 serviceLock.readLock().lock(); 174 try { 175 if (!(o instanceof Command)) { 176 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 177 } 178 Command command = (Command) o; 179 Response response = service(command); 180 if (response != null) { 181 dispatchSync(response); 182 } 183 } finally { 184 serviceLock.readLock().unlock(); 185 } 186 } 187 188 @Override 189 public void onException(IOException exception) { 190 serviceLock.readLock().lock(); 191 try { 192 serviceTransportException(exception); 193 } finally { 194 serviceLock.readLock().unlock(); 195 } 196 } 197 }); 198 connected = true; 199 } 200 201 /** 202 * Returns the number of messages to be dispatched to this connection 203 * 204 * @return size of dispatch queue 205 */ 206 public int getDispatchQueueSize() { 207 synchronized (dispatchQueue) { 208 return dispatchQueue.size(); 209 } 210 } 211 212 public void serviceTransportException(IOException e) { 213 BrokerService bService = connector.getBrokerService(); 214 if (bService.isShutdownOnSlaveFailure()) { 215 if (brokerInfo != null) { 216 if (brokerInfo.isSlaveBroker()) { 217 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e); 218 try { 219 doStop(); 220 bService.stop(); 221 } catch (Exception ex) { 222 LOG.warn("Failed to stop the master", ex); 223 } 224 } 225 } 226 } 227 if (!stopping.get()) { 228 transportException.set(e); 229 if (TRANSPORTLOG.isDebugEnabled()) { 230 TRANSPORTLOG.debug("Transport failed: " + e, e); 231 } else if (TRANSPORTLOG.isInfoEnabled()) { 232 TRANSPORTLOG.info("Transport failed: " + e); 233 } 234 stopAsync(); 235 } 236 } 237 238 /** 239 * Calls the serviceException method in an async thread. Since handling a 240 * service exception closes a socket, we should not tie up broker threads 241 * since client sockets may hang or cause deadlocks. 242 * 243 * @param e 244 */ 245 public void serviceExceptionAsync(final IOException e) { 246 if (asyncException.compareAndSet(false, true)) { 247 new Thread("Async Exception Handler") { 248 @Override 249 public void run() { 250 serviceException(e); 251 } 252 }.start(); 253 } 254 } 255 256 /** 257 * Closes a clients connection due to a detected error. Errors are ignored 258 * if: the client is closing or broker is closing. Otherwise, the connection 259 * error transmitted to the client before stopping it's transport. 260 */ 261 public void serviceException(Throwable e) { 262 // are we a transport exception such as not being able to dispatch 263 // synchronously to a transport 264 if (e instanceof IOException) { 265 serviceTransportException((IOException) e); 266 } else if (e.getClass() == BrokerStoppedException.class) { 267 // Handle the case where the broker is stopped 268 // But the client is still connected. 269 if (!stopping.get()) { 270 if (SERVICELOG.isDebugEnabled()) { 271 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 272 } 273 ConnectionError ce = new ConnectionError(); 274 ce.setException(e); 275 dispatchSync(ce); 276 // Wait a little bit to try to get the output buffer to flush 277 // the exption notification to the client. 278 try { 279 Thread.sleep(500); 280 } catch (InterruptedException ie) { 281 Thread.currentThread().interrupt(); 282 } 283 // Worst case is we just kill the connection before the 284 // notification gets to him. 285 stopAsync(); 286 } 287 } else if (!stopping.get() && !inServiceException) { 288 inServiceException = true; 289 try { 290 SERVICELOG.warn("Async error occurred: " + e, e); 291 ConnectionError ce = new ConnectionError(); 292 ce.setException(e); 293 dispatchAsync(ce); 294 } finally { 295 inServiceException = false; 296 } 297 } 298 } 299 300 public Response service(Command command) { 301 MDC.put("activemq.connector", connector.getUri().toString()); 302 Response response = null; 303 boolean responseRequired = command.isResponseRequired(); 304 int commandId = command.getCommandId(); 305 try { 306 response = command.visit(this); 307 } catch (Throwable e) { 308 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 309 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 310 + " command: " + command + ", exception: " + e, e); 311 } 312 if (responseRequired) { 313 response = new ExceptionResponse(e); 314 } else { 315 serviceException(e); 316 } 317 } 318 if (responseRequired) { 319 if (response == null) { 320 response = new Response(); 321 } 322 response.setCorrelationId(commandId); 323 } 324 // The context may have been flagged so that the response is not 325 // sent. 326 if (context != null) { 327 if (context.isDontSendReponse()) { 328 context.setDontSendReponse(false); 329 response = null; 330 } 331 context = null; 332 } 333 MDC.remove("activemq.connector"); 334 return response; 335 } 336 337 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 338 return null; 339 } 340 341 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 342 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 343 return null; 344 } 345 346 public Response processWireFormat(WireFormatInfo info) throws Exception { 347 wireFormatInfo = info; 348 protocolVersion.set(info.getVersion()); 349 return null; 350 } 351 352 public Response processShutdown(ShutdownInfo info) throws Exception { 353 stopAsync(); 354 return null; 355 } 356 357 public Response processFlush(FlushCommand command) throws Exception { 358 return null; 359 } 360 361 public Response processBeginTransaction(TransactionInfo info) throws Exception { 362 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 363 context = null; 364 if (cs != null) { 365 context = cs.getContext(); 366 } 367 if (cs == null) { 368 throw new NullPointerException("Context is null"); 369 } 370 // Avoid replaying dup commands 371 if (cs.getTransactionState(info.getTransactionId()) == null) { 372 cs.addTransactionState(info.getTransactionId()); 373 broker.beginTransaction(context, info.getTransactionId()); 374 } 375 return null; 376 } 377 378 public Response processEndTransaction(TransactionInfo info) throws Exception { 379 // No need to do anything. This packet is just sent by the client 380 // make sure he is synced with the server as commit command could 381 // come from a different connection. 382 return null; 383 } 384 385 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 386 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 387 context = null; 388 if (cs != null) { 389 context = cs.getContext(); 390 } 391 if (cs == null) { 392 throw new NullPointerException("Context is null"); 393 } 394 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 395 if (transactionState == null) { 396 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 397 + info.getTransactionId()); 398 } 399 // Avoid dups. 400 if (!transactionState.isPrepared()) { 401 transactionState.setPrepared(true); 402 int result = broker.prepareTransaction(context, info.getTransactionId()); 403 transactionState.setPreparedResult(result); 404 if (result == XAResource.XA_RDONLY) { 405 // we are done, no further rollback or commit from TM 406 cs.removeTransactionState(info.getTransactionId()); 407 } 408 IntegerResponse response = new IntegerResponse(result); 409 return response; 410 } else { 411 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 412 return response; 413 } 414 } 415 416 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 417 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 418 context = cs.getContext(); 419 cs.removeTransactionState(info.getTransactionId()); 420 broker.commitTransaction(context, info.getTransactionId(), true); 421 return null; 422 } 423 424 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 425 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 426 context = cs.getContext(); 427 cs.removeTransactionState(info.getTransactionId()); 428 broker.commitTransaction(context, info.getTransactionId(), false); 429 return null; 430 } 431 432 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 433 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 434 context = cs.getContext(); 435 cs.removeTransactionState(info.getTransactionId()); 436 broker.rollbackTransaction(context, info.getTransactionId()); 437 return null; 438 } 439 440 public Response processForgetTransaction(TransactionInfo info) throws Exception { 441 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 442 context = cs.getContext(); 443 broker.forgetTransaction(context, info.getTransactionId()); 444 return null; 445 } 446 447 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 448 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 449 context = cs.getContext(); 450 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 451 return new DataArrayResponse(preparedTransactions); 452 } 453 454 public Response processMessage(Message messageSend) throws Exception { 455 ProducerId producerId = messageSend.getProducerId(); 456 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 457 if (producerExchange.canDispatch(messageSend)) { 458 broker.send(producerExchange, messageSend); 459 } 460 return null; 461 } 462 463 public Response processMessageAck(MessageAck ack) throws Exception { 464 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 465 broker.acknowledge(consumerExchange, ack); 466 return null; 467 } 468 469 public Response processMessagePull(MessagePull pull) throws Exception { 470 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 471 } 472 473 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 474 broker.processDispatchNotification(notification); 475 return null; 476 } 477 478 public Response processAddDestination(DestinationInfo info) throws Exception { 479 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 480 broker.addDestinationInfo(cs.getContext(), info); 481 if (info.getDestination().isTemporary()) { 482 cs.addTempDestination(info); 483 } 484 return null; 485 } 486 487 public Response processRemoveDestination(DestinationInfo info) throws Exception { 488 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 489 broker.removeDestinationInfo(cs.getContext(), info); 490 if (info.getDestination().isTemporary()) { 491 cs.removeTempDestination(info.getDestination()); 492 } 493 return null; 494 } 495 496 public Response processAddProducer(ProducerInfo info) throws Exception { 497 SessionId sessionId = info.getProducerId().getParentId(); 498 ConnectionId connectionId = sessionId.getParentId(); 499 TransportConnectionState cs = lookupConnectionState(connectionId); 500 SessionState ss = cs.getSessionState(sessionId); 501 if (ss == null) { 502 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 503 + sessionId); 504 } 505 // Avoid replaying dup commands 506 if (!ss.getProducerIds().contains(info.getProducerId())) { 507 broker.addProducer(cs.getContext(), info); 508 try { 509 ss.addProducer(info); 510 } catch (IllegalStateException e) { 511 broker.removeProducer(cs.getContext(), info); 512 } 513 } 514 return null; 515 } 516 517 public Response processRemoveProducer(ProducerId id) throws Exception { 518 SessionId sessionId = id.getParentId(); 519 ConnectionId connectionId = sessionId.getParentId(); 520 TransportConnectionState cs = lookupConnectionState(connectionId); 521 SessionState ss = cs.getSessionState(sessionId); 522 if (ss == null) { 523 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 524 + sessionId); 525 } 526 ProducerState ps = ss.removeProducer(id); 527 if (ps == null) { 528 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 529 } 530 removeProducerBrokerExchange(id); 531 broker.removeProducer(cs.getContext(), ps.getInfo()); 532 return null; 533 } 534 535 public Response processAddConsumer(ConsumerInfo info) throws Exception { 536 SessionId sessionId = info.getConsumerId().getParentId(); 537 ConnectionId connectionId = sessionId.getParentId(); 538 TransportConnectionState cs = lookupConnectionState(connectionId); 539 SessionState ss = cs.getSessionState(sessionId); 540 if (ss == null) { 541 throw new IllegalStateException(broker.getBrokerName() 542 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 543 } 544 // Avoid replaying dup commands 545 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 546 broker.addConsumer(cs.getContext(), info); 547 try { 548 ss.addConsumer(info); 549 } catch (IllegalStateException e) { 550 broker.removeConsumer(cs.getContext(), info); 551 } 552 } 553 return null; 554 } 555 556 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 557 SessionId sessionId = id.getParentId(); 558 ConnectionId connectionId = sessionId.getParentId(); 559 TransportConnectionState cs = lookupConnectionState(connectionId); 560 if (cs == null) { 561 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 562 + connectionId); 563 } 564 SessionState ss = cs.getSessionState(sessionId); 565 if (ss == null) { 566 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 567 + sessionId); 568 } 569 ConsumerState consumerState = ss.removeConsumer(id); 570 if (consumerState == null) { 571 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 572 } 573 ConsumerInfo info = consumerState.getInfo(); 574 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 575 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 576 removeConsumerBrokerExchange(id); 577 return null; 578 } 579 580 public Response processAddSession(SessionInfo info) throws Exception { 581 ConnectionId connectionId = info.getSessionId().getParentId(); 582 TransportConnectionState cs = lookupConnectionState(connectionId); 583 // Avoid replaying dup commands 584 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 585 broker.addSession(cs.getContext(), info); 586 try { 587 cs.addSession(info); 588 } catch (IllegalStateException e) { 589 e.printStackTrace(); 590 broker.removeSession(cs.getContext(), info); 591 } 592 } 593 return null; 594 } 595 596 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 597 ConnectionId connectionId = id.getParentId(); 598 TransportConnectionState cs = lookupConnectionState(connectionId); 599 if (cs == null) { 600 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 601 } 602 SessionState session = cs.getSessionState(id); 603 if (session == null) { 604 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 605 } 606 // Don't let new consumers or producers get added while we are closing 607 // this down. 608 session.shutdown(); 609 // Cascade the connection stop to the consumers and producers. 610 for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) { 611 ConsumerId consumerId = (ConsumerId) iter.next(); 612 try { 613 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 614 } catch (Throwable e) { 615 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); 616 } 617 } 618 for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) { 619 ProducerId producerId = (ProducerId) iter.next(); 620 try { 621 processRemoveProducer(producerId); 622 } catch (Throwable e) { 623 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e); 624 } 625 } 626 cs.removeSession(id); 627 broker.removeSession(cs.getContext(), session.getInfo()); 628 return null; 629 } 630 631 public Response processAddConnection(ConnectionInfo info) throws Exception { 632 // if the broker service has slave attached, wait for the slave to be 633 // attached to allow client connection. slave connection is fine 634 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave() 635 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) { 636 ServiceSupport.dispose(transport); 637 return new ExceptionResponse(new Exception("Master's slave not attached yet.")); 638 } 639 // Older clients should have been defaulting this field to true.. but 640 // they were not. 641 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 642 info.setClientMaster(true); 643 } 644 TransportConnectionState state; 645 // Make sure 2 concurrent connections by the same ID only generate 1 646 // TransportConnectionState object. 647 synchronized (brokerConnectionStates) { 648 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 649 if (state == null) { 650 state = new TransportConnectionState(info, this); 651 brokerConnectionStates.put(info.getConnectionId(), state); 652 } 653 state.incrementReference(); 654 } 655 // If there are 2 concurrent connections for the same connection id, 656 // then last one in wins, we need to sync here 657 // to figure out the winner. 658 synchronized (state.getConnectionMutex()) { 659 if (state.getConnection() != this) { 660 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); 661 state.getConnection().stop(); 662 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " 663 + state.getConnection().getRemoteAddress()); 664 state.setConnection(this); 665 state.reset(info); 666 } 667 } 668 registerConnectionState(info.getConnectionId(), state); 669 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); 670 this.faultTolerantConnection=info.isFaultTolerant(); 671 // Setup the context. 672 String clientId = info.getClientId(); 673 context = new ConnectionContext(); 674 context.setBroker(broker); 675 context.setClientId(clientId); 676 context.setClientMaster(info.isClientMaster()); 677 context.setConnection(this); 678 context.setConnectionId(info.getConnectionId()); 679 context.setConnector(connector); 680 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 681 context.setNetworkConnection(networkConnection); 682 context.setFaultTolerant(faultTolerantConnection); 683 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 684 context.setUserName(info.getUserName()); 685 context.setWireFormatInfo(wireFormatInfo); 686 context.setReconnect(info.isFailoverReconnect()); 687 this.manageable = info.isManageable(); 688 state.setContext(context); 689 state.setConnection(this); 690 691 try { 692 broker.addConnection(context, info); 693 } catch (Exception e) { 694 synchronized (brokerConnectionStates) { 695 brokerConnectionStates.remove(info.getConnectionId()); 696 } 697 unregisterConnectionState(info.getConnectionId()); 698 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString()); 699 if (LOG.isDebugEnabled()) { 700 LOG.debug("Exception detail:", e); 701 } 702 throw e; 703 } 704 if (info.isManageable()) { 705 // send ConnectionCommand 706 ConnectionControl command = this.connector.getConnectionControl(); 707 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 708 dispatchAsync(command); 709 } 710 return null; 711 } 712 713 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 714 throws InterruptedException { 715 LOG.debug("remove connection id: " + id); 716 TransportConnectionState cs = lookupConnectionState(id); 717 if (cs != null) { 718 // Don't allow things to be added to the connection state while we 719 // are 720 // shutting down. 721 cs.shutdown(); 722 // Cascade the connection stop to the sessions. 723 for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { 724 SessionId sessionId = (SessionId) iter.next(); 725 try { 726 processRemoveSession(sessionId, lastDeliveredSequenceId); 727 } catch (Throwable e) { 728 SERVICELOG.warn("Failed to remove session " + sessionId, e); 729 } 730 } 731 // Cascade the connection stop to temp destinations. 732 for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) { 733 DestinationInfo di = (DestinationInfo) iter.next(); 734 try { 735 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 736 } catch (Throwable e) { 737 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); 738 } 739 iter.remove(); 740 } 741 try { 742 broker.removeConnection(cs.getContext(), cs.getInfo(), null); 743 } catch (Throwable e) { 744 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString()); 745 if (LOG.isDebugEnabled()) { 746 SERVICELOG.debug("Exception detail:", e); 747 } 748 } 749 TransportConnectionState state = unregisterConnectionState(id); 750 if (state != null) { 751 synchronized (brokerConnectionStates) { 752 // If we are the last reference, we should remove the state 753 // from the broker. 754 if (state.decrementReference() == 0) { 755 brokerConnectionStates.remove(id); 756 } 757 } 758 } 759 } 760 return null; 761 } 762 763 public Response processProducerAck(ProducerAck ack) throws Exception { 764 // A broker should not get ProducerAck messages. 765 return null; 766 } 767 768 public Connector getConnector() { 769 return connector; 770 } 771 772 public void dispatchSync(Command message) { 773 // getStatistics().getEnqueues().increment(); 774 try { 775 processDispatch(message); 776 } catch (IOException e) { 777 serviceExceptionAsync(e); 778 } 779 } 780 781 public void dispatchAsync(Command message) { 782 if (!stopping.get()) { 783 // getStatistics().getEnqueues().increment(); 784 if (taskRunner == null) { 785 dispatchSync(message); 786 } else { 787 synchronized (dispatchQueue) { 788 dispatchQueue.add(message); 789 } 790 try { 791 taskRunner.wakeup(); 792 } catch (InterruptedException e) { 793 Thread.currentThread().interrupt(); 794 } 795 } 796 } else { 797 if (message.isMessageDispatch()) { 798 MessageDispatch md = (MessageDispatch) message; 799 Runnable sub = md.getTransmitCallback(); 800 broker.postProcessDispatch(md); 801 if (sub != null) { 802 sub.run(); 803 } 804 } 805 } 806 } 807 808 protected void processDispatch(Command command) throws IOException { 809 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 810 try { 811 if (!stopping.get()) { 812 if (messageDispatch != null) { 813 broker.preProcessDispatch(messageDispatch); 814 } 815 dispatch(command); 816 } 817 } finally { 818 if (messageDispatch != null) { 819 Runnable sub = messageDispatch.getTransmitCallback(); 820 broker.postProcessDispatch(messageDispatch); 821 if (sub != null) { 822 sub.run(); 823 } 824 } 825 // getStatistics().getDequeues().increment(); 826 } 827 } 828 829 public boolean iterate() { 830 try { 831 if (stopping.get()) { 832 if (dispatchStopped.compareAndSet(false, true)) { 833 if (transportException.get() == null) { 834 try { 835 dispatch(new ShutdownInfo()); 836 } catch (Throwable ignore) { 837 } 838 } 839 dispatchStoppedLatch.countDown(); 840 } 841 return false; 842 } 843 if (!dispatchStopped.get()) { 844 Command command = null; 845 synchronized (dispatchQueue) { 846 if (dispatchQueue.isEmpty()) { 847 return false; 848 } 849 command = dispatchQueue.remove(0); 850 } 851 processDispatch(command); 852 return true; 853 } 854 return false; 855 } catch (IOException e) { 856 if (dispatchStopped.compareAndSet(false, true)) { 857 dispatchStoppedLatch.countDown(); 858 } 859 serviceExceptionAsync(e); 860 return false; 861 } 862 } 863 864 /** 865 * Returns the statistics for this connection 866 */ 867 public ConnectionStatistics getStatistics() { 868 return statistics; 869 } 870 871 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 872 return messageAuthorizationPolicy; 873 } 874 875 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 876 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 877 } 878 879 public boolean isManageable() { 880 return manageable; 881 } 882 883 public void start() throws Exception { 884 starting = true; 885 try { 886 synchronized (this) { 887 if (taskRunnerFactory != null) { 888 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 889 + getRemoteAddress()); 890 } else { 891 taskRunner = null; 892 } 893 transport.start(); 894 active = true; 895 BrokerInfo info = connector.getBrokerInfo().copy(); 896 if (connector.isUpdateClusterClients()) { 897 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 898 } else { 899 info.setPeerBrokerInfos(null); 900 } 901 dispatchAsync(info); 902 903 connector.onStarted(this); 904 } 905 } catch (Exception e) { 906 // Force clean up on an error starting up. 907 stop(); 908 throw e; 909 } finally { 910 // stop() can be called from within the above block, 911 // but we want to be sure start() completes before 912 // stop() runs, so queue the stop until right now: 913 starting = false; 914 if (pendingStop) { 915 LOG.debug("Calling the delayed stop()"); 916 stop(); 917 } 918 } 919 } 920 921 public void stop() throws Exception { 922 synchronized (this) { 923 pendingStop = true; 924 if (starting) { 925 LOG.debug("stop() called in the middle of start(). Delaying..."); 926 return; 927 } 928 } 929 stopAsync(); 930 while (!stopped.await(5, TimeUnit.SECONDS)) { 931 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); 932 } 933 } 934 935 public void stopAsync() { 936 // If we're in the middle of starting 937 // then go no further... for now. 938 if (stopping.compareAndSet(false, true)) { 939 // Let all the connection contexts know we are shutting down 940 // so that in progress operations can notice and unblock. 941 List<TransportConnectionState> connectionStates = listConnectionStates(); 942 for (TransportConnectionState cs : connectionStates) { 943 cs.getContext().getStopping().set(true); 944 } 945 try { 946 final Map context = MDCHelper.getCopyOfContextMap(); 947 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){ 948 public void run() { 949 serviceLock.writeLock().lock(); 950 try { 951 MDCHelper.setContextMap(context); 952 doStop(); 953 } catch (Throwable e) { 954 LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() 955 + "': ", e); 956 } finally { 957 stopped.countDown(); 958 serviceLock.writeLock().unlock(); 959 } 960 } 961 }, "StopAsync:" + transport.getRemoteAddress()); 962 } catch (Throwable t) { 963 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); 964 stopped.countDown(); 965 } 966 } 967 } 968 969 @Override 970 public String toString() { 971 return "Transport Connection to: " + transport.getRemoteAddress(); 972 } 973 974 protected void doStop() throws Exception, InterruptedException { 975 LOG.debug("Stopping connection: " + transport.getRemoteAddress()); 976 connector.onStopped(this); 977 try { 978 synchronized (this) { 979 if (masterBroker != null) { 980 masterBroker.stop(); 981 } 982 if (duplexBridge != null) { 983 duplexBridge.stop(); 984 } 985 } 986 } catch (Exception ignore) { 987 LOG.trace("Exception caught stopping", ignore); 988 } 989 try { 990 transport.stop(); 991 LOG.debug("Stopped transport: " + transport.getRemoteAddress()); 992 } catch (Exception e) { 993 LOG.debug("Could not stop transport: " + e, e); 994 } 995 if (taskRunner != null) { 996 taskRunner.shutdown(1); 997 } 998 active = false; 999 // Run the MessageDispatch callbacks so that message references get 1000 // cleaned up. 1001 synchronized (dispatchQueue) { 1002 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) { 1003 Command command = iter.next(); 1004 if (command.isMessageDispatch()) { 1005 MessageDispatch md = (MessageDispatch) command; 1006 Runnable sub = md.getTransmitCallback(); 1007 broker.postProcessDispatch(md); 1008 if (sub != null) { 1009 sub.run(); 1010 } 1011 } 1012 } 1013 dispatchQueue.clear(); 1014 } 1015 // 1016 // Remove all logical connection associated with this connection 1017 // from the broker. 1018 if (!broker.isStopped()) { 1019 List<TransportConnectionState> connectionStates = listConnectionStates(); 1020 connectionStates = listConnectionStates(); 1021 for (TransportConnectionState cs : connectionStates) { 1022 cs.getContext().getStopping().set(true); 1023 try { 1024 LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); 1025 processRemoveConnection(cs.getInfo().getConnectionId(), 0l); 1026 } catch (Throwable ignore) { 1027 ignore.printStackTrace(); 1028 } 1029 } 1030 } 1031 LOG.debug("Connection Stopped: " + getRemoteAddress()); 1032 } 1033 1034 /** 1035 * @return Returns the blockedCandidate. 1036 */ 1037 public boolean isBlockedCandidate() { 1038 return blockedCandidate; 1039 } 1040 1041 /** 1042 * @param blockedCandidate 1043 * The blockedCandidate to set. 1044 */ 1045 public void setBlockedCandidate(boolean blockedCandidate) { 1046 this.blockedCandidate = blockedCandidate; 1047 } 1048 1049 /** 1050 * @return Returns the markedCandidate. 1051 */ 1052 public boolean isMarkedCandidate() { 1053 return markedCandidate; 1054 } 1055 1056 /** 1057 * @param markedCandidate 1058 * The markedCandidate to set. 1059 */ 1060 public void setMarkedCandidate(boolean markedCandidate) { 1061 this.markedCandidate = markedCandidate; 1062 if (!markedCandidate) { 1063 timeStamp = 0; 1064 blockedCandidate = false; 1065 } 1066 } 1067 1068 /** 1069 * @param slow 1070 * The slow to set. 1071 */ 1072 public void setSlow(boolean slow) { 1073 this.slow = slow; 1074 } 1075 1076 /** 1077 * @return true if the Connection is slow 1078 */ 1079 public boolean isSlow() { 1080 return slow; 1081 } 1082 1083 /** 1084 * @return true if the Connection is potentially blocked 1085 */ 1086 public boolean isMarkedBlockedCandidate() { 1087 return markedCandidate; 1088 } 1089 1090 /** 1091 * Mark the Connection, so we can deem if it's collectable on the next sweep 1092 */ 1093 public void doMark() { 1094 if (timeStamp == 0) { 1095 timeStamp = System.currentTimeMillis(); 1096 } 1097 } 1098 1099 /** 1100 * @return if after being marked, the Connection is still writing 1101 */ 1102 public boolean isBlocked() { 1103 return blocked; 1104 } 1105 1106 /** 1107 * @return true if the Connection is connected 1108 */ 1109 public boolean isConnected() { 1110 return connected; 1111 } 1112 1113 /** 1114 * @param blocked 1115 * The blocked to set. 1116 */ 1117 public void setBlocked(boolean blocked) { 1118 this.blocked = blocked; 1119 } 1120 1121 /** 1122 * @param connected 1123 * The connected to set. 1124 */ 1125 public void setConnected(boolean connected) { 1126 this.connected = connected; 1127 } 1128 1129 /** 1130 * @return true if the Connection is active 1131 */ 1132 public boolean isActive() { 1133 return active; 1134 } 1135 1136 /** 1137 * @param active 1138 * The active to set. 1139 */ 1140 public void setActive(boolean active) { 1141 this.active = active; 1142 } 1143 1144 /** 1145 * @return true if the Connection is starting 1146 */ 1147 public synchronized boolean isStarting() { 1148 return starting; 1149 } 1150 1151 public synchronized boolean isNetworkConnection() { 1152 return networkConnection; 1153 } 1154 1155 public boolean isFaultTolerantConnection() { 1156 return this.faultTolerantConnection; 1157 } 1158 1159 protected synchronized void setStarting(boolean starting) { 1160 this.starting = starting; 1161 } 1162 1163 /** 1164 * @return true if the Connection needs to stop 1165 */ 1166 public synchronized boolean isPendingStop() { 1167 return pendingStop; 1168 } 1169 1170 protected synchronized void setPendingStop(boolean pendingStop) { 1171 this.pendingStop = pendingStop; 1172 } 1173 1174 public Response processBrokerInfo(BrokerInfo info) { 1175 if (info.isSlaveBroker()) { 1176 BrokerService bService = connector.getBrokerService(); 1177 // Do we only support passive slaves - or does the slave want to be 1178 // passive ? 1179 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave(); 1180 if (passive == false) { 1181 1182 // stream messages from this broker (the master) to 1183 // the slave 1184 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class); 1185 masterBroker = new MasterBroker(parent, transport); 1186 masterBroker.startProcessing(); 1187 } 1188 LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached"); 1189 bService.slaveConnectionEstablished(); 1190 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1191 // so this TransportConnection is the rear end of a network bridge 1192 // We have been requested to create a two way pipe ... 1193 try { 1194 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1195 Map<String, String> props = createMap(properties); 1196 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1197 IntrospectionSupport.setProperties(config, props, ""); 1198 config.setBrokerName(broker.getBrokerName()); 1199 1200 // check for existing duplex connection hanging about 1201 1202 // We first look if existing network connection already exists for the same broker Id and network connector name 1203 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1204 // and the duplex network connector side wanting to open a new one 1205 // In this case, the old connection must be broken 1206 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1207 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1208 synchronized (connections) { 1209 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) { 1210 TransportConnection c = iter.next(); 1211 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1212 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ")."); 1213 c.stopAsync(); 1214 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1215 c.getStopped().await(1, TimeUnit.SECONDS); 1216 } 1217 } 1218 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1219 } 1220 URI uri = broker.getVmConnectorURI(); 1221 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 1222 map.put("network", "true"); 1223 map.put("async", "false"); 1224 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 1225 Transport localTransport = TransportFactory.connect(uri); 1226 Transport remoteBridgeTransport = new ResponseCorrelator(transport); 1227 String duplexName = localTransport.toString(); 1228 if (duplexName.contains("#")) { 1229 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1230 } 1231 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); 1232 listener.setCreatedByDuplex(true); 1233 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1234 duplexBridge.setBrokerService(broker.getBrokerService()); 1235 // now turn duplex off this side 1236 info.setDuplexConnection(false); 1237 duplexBridge.setCreatedByDuplex(true); 1238 duplexBridge.duplexStart(this, brokerInfo, info); 1239 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId); 1240 return null; 1241 } catch (TransportDisposedIOException e) { 1242 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started."); 1243 return null; 1244 } catch (Exception e) { 1245 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e); 1246 return null; 1247 } 1248 } 1249 // We only expect to get one broker info command per connection 1250 if (this.brokerInfo != null) { 1251 LOG.warn("Unexpected extra broker info command received: " + info); 1252 } 1253 this.brokerInfo = info; 1254 networkConnection = true; 1255 List<TransportConnectionState> connectionStates = listConnectionStates(); 1256 for (TransportConnectionState cs : connectionStates) { 1257 cs.getContext().setNetworkConnection(true); 1258 } 1259 return null; 1260 } 1261 1262 @SuppressWarnings("unchecked") 1263 private HashMap<String, String> createMap(Properties properties) { 1264 return new HashMap(properties); 1265 } 1266 1267 protected void dispatch(Command command) throws IOException { 1268 try { 1269 setMarkedCandidate(true); 1270 transport.oneway(command); 1271 } finally { 1272 setMarkedCandidate(false); 1273 } 1274 } 1275 1276 public String getRemoteAddress() { 1277 return transport.getRemoteAddress(); 1278 } 1279 1280 public String getConnectionId() { 1281 List<TransportConnectionState> connectionStates = listConnectionStates(); 1282 for (TransportConnectionState cs : connectionStates) { 1283 if (cs.getInfo().getClientId() != null) { 1284 return cs.getInfo().getClientId(); 1285 } 1286 return cs.getInfo().getConnectionId().toString(); 1287 } 1288 return null; 1289 } 1290 1291 public void updateClient(ConnectionControl control) { 1292 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1293 && this.wireFormatInfo.getVersion() >= 6) { 1294 dispatchAsync(control); 1295 } 1296 } 1297 1298 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1299 ProducerBrokerExchange result = producerExchanges.get(id); 1300 if (result == null) { 1301 synchronized (producerExchanges) { 1302 result = new ProducerBrokerExchange(); 1303 TransportConnectionState state = lookupConnectionState(id); 1304 context = state.getContext(); 1305 if (context.isReconnect()) { 1306 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); 1307 } 1308 result.setConnectionContext(context); 1309 SessionState ss = state.getSessionState(id.getParentId()); 1310 if (ss != null) { 1311 result.setProducerState(ss.getProducerState(id)); 1312 ProducerState producerState = ss.getProducerState(id); 1313 if (producerState != null && producerState.getInfo() != null) { 1314 ProducerInfo info = producerState.getInfo(); 1315 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1316 } 1317 } 1318 producerExchanges.put(id, result); 1319 } 1320 } else { 1321 context = result.getConnectionContext(); 1322 } 1323 return result; 1324 } 1325 1326 private void removeProducerBrokerExchange(ProducerId id) { 1327 synchronized (producerExchanges) { 1328 producerExchanges.remove(id); 1329 } 1330 } 1331 1332 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1333 ConsumerBrokerExchange result = consumerExchanges.get(id); 1334 if (result == null) { 1335 synchronized (consumerExchanges) { 1336 result = new ConsumerBrokerExchange(); 1337 TransportConnectionState state = lookupConnectionState(id); 1338 context = state.getContext(); 1339 result.setConnectionContext(context); 1340 SessionState ss = state.getSessionState(id.getParentId()); 1341 if (ss != null) { 1342 ConsumerState cs = ss.getConsumerState(id); 1343 if (cs != null) { 1344 ConsumerInfo info = cs.getInfo(); 1345 if (info != null) { 1346 if (info.getDestination() != null && info.getDestination().isPattern()) { 1347 result.setWildcard(true); 1348 } 1349 } 1350 } 1351 } 1352 consumerExchanges.put(id, result); 1353 } 1354 } 1355 return result; 1356 } 1357 1358 private void removeConsumerBrokerExchange(ConsumerId id) { 1359 synchronized (consumerExchanges) { 1360 consumerExchanges.remove(id); 1361 } 1362 } 1363 1364 public int getProtocolVersion() { 1365 return protocolVersion.get(); 1366 } 1367 1368 public Response processControlCommand(ControlCommand command) throws Exception { 1369 String control = command.getCommand(); 1370 if (control != null && control.equals("shutdown")) { 1371 System.exit(0); 1372 } 1373 return null; 1374 } 1375 1376 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1377 return null; 1378 } 1379 1380 public Response processConnectionControl(ConnectionControl control) throws Exception { 1381 if (control != null) { 1382 faultTolerantConnection = control.isFaultTolerant(); 1383 } 1384 return null; 1385 } 1386 1387 public Response processConnectionError(ConnectionError error) throws Exception { 1388 return null; 1389 } 1390 1391 public Response processConsumerControl(ConsumerControl control) throws Exception { 1392 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1393 broker.processConsumerControl(consumerExchange, control); 1394 return null; 1395 } 1396 1397 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1398 TransportConnectionState state) { 1399 TransportConnectionState cs = null; 1400 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1401 // swap implementations 1402 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1403 newRegister.intialize(connectionStateRegister); 1404 connectionStateRegister = newRegister; 1405 } 1406 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1407 return cs; 1408 } 1409 1410 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1411 return connectionStateRegister.unregisterConnectionState(connectionId); 1412 } 1413 1414 protected synchronized List<TransportConnectionState> listConnectionStates() { 1415 return connectionStateRegister.listConnectionStates(); 1416 } 1417 1418 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1419 return connectionStateRegister.lookupConnectionState(connectionId); 1420 } 1421 1422 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1423 return connectionStateRegister.lookupConnectionState(id); 1424 } 1425 1426 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1427 return connectionStateRegister.lookupConnectionState(id); 1428 } 1429 1430 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1431 return connectionStateRegister.lookupConnectionState(id); 1432 } 1433 1434 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1435 return connectionStateRegister.lookupConnectionState(connectionId); 1436 } 1437 1438 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1439 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1440 } 1441 1442 protected synchronized String getDuplexNetworkConnectorId() { 1443 return this.duplexNetworkConnectorId; 1444 } 1445 1446 protected CountDownLatch getStopped() { 1447 return stopped; 1448 } 1449 }