001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.security.GeneralSecurityException; 021 import java.security.cert.X509Certificate; 022 import java.util.*; 023 import java.util.concurrent.ConcurrentHashMap; 024 import java.util.concurrent.CountDownLatch; 025 import java.util.concurrent.TimeUnit; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 import java.util.concurrent.atomic.AtomicLong; 028 029 import org.apache.activemq.Service; 030 import org.apache.activemq.advisory.AdvisorySupport; 031 import org.apache.activemq.broker.BrokerService; 032 import org.apache.activemq.broker.BrokerServiceAware; 033 import org.apache.activemq.broker.TransportConnection; 034 import org.apache.activemq.broker.region.AbstractRegion; 035 import org.apache.activemq.broker.region.RegionBroker; 036 import org.apache.activemq.broker.region.Subscription; 037 import org.apache.activemq.command.ActiveMQDestination; 038 import org.apache.activemq.command.ActiveMQMessage; 039 import org.apache.activemq.command.ActiveMQTempDestination; 040 import org.apache.activemq.command.ActiveMQTopic; 041 import org.apache.activemq.command.BrokerId; 042 import org.apache.activemq.command.BrokerInfo; 043 import org.apache.activemq.command.Command; 044 import org.apache.activemq.command.ConnectionError; 045 import org.apache.activemq.command.ConnectionId; 046 import org.apache.activemq.command.ConnectionInfo; 047 import org.apache.activemq.command.ConsumerId; 048 import org.apache.activemq.command.ConsumerInfo; 049 import org.apache.activemq.command.DataStructure; 050 import org.apache.activemq.command.DestinationInfo; 051 import org.apache.activemq.command.ExceptionResponse; 052 import org.apache.activemq.command.KeepAliveInfo; 053 import org.apache.activemq.command.Message; 054 import org.apache.activemq.command.MessageAck; 055 import org.apache.activemq.command.MessageDispatch; 056 import org.apache.activemq.command.NetworkBridgeFilter; 057 import org.apache.activemq.command.ProducerInfo; 058 import org.apache.activemq.command.RemoveInfo; 059 import org.apache.activemq.command.Response; 060 import org.apache.activemq.command.SessionInfo; 061 import org.apache.activemq.command.ShutdownInfo; 062 import org.apache.activemq.command.WireFormatInfo; 063 import org.apache.activemq.filter.DestinationFilter; 064 import org.apache.activemq.filter.MessageEvaluationContext; 065 import org.apache.activemq.thread.DefaultThreadPools; 066 import org.apache.activemq.thread.TaskRunnerFactory; 067 import org.apache.activemq.transport.DefaultTransportListener; 068 import org.apache.activemq.transport.FutureResponse; 069 import org.apache.activemq.transport.ResponseCallback; 070 import org.apache.activemq.transport.Transport; 071 import org.apache.activemq.transport.TransportDisposedIOException; 072 import org.apache.activemq.transport.TransportFilter; 073 import org.apache.activemq.transport.TransportListener; 074 import org.apache.activemq.transport.tcp.SslTransport; 075 import org.apache.activemq.util.*; 076 import org.slf4j.Logger; 077 import org.slf4j.LoggerFactory; 078 import org.slf4j.MDC; 079 080 /** 081 * A useful base class for implementing demand forwarding bridges. 082 * 083 * 084 */ 085 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 086 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 087 private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory(); 088 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 089 protected final Transport localBroker; 090 protected final Transport remoteBroker; 091 protected final IdGenerator idGenerator = new IdGenerator(); 092 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 093 protected ConnectionInfo localConnectionInfo; 094 protected ConnectionInfo remoteConnectionInfo; 095 protected SessionInfo localSessionInfo; 096 protected ProducerInfo producerInfo; 097 protected String remoteBrokerName = "Unknown"; 098 protected String localClientId; 099 protected ConsumerInfo demandConsumerInfo; 100 protected int demandConsumerDispatched; 101 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 102 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 103 protected AtomicBoolean disposed = new AtomicBoolean(); 104 protected BrokerId localBrokerId; 105 protected ActiveMQDestination[] excludedDestinations; 106 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 107 protected ActiveMQDestination[] staticallyIncludedDestinations; 108 protected ActiveMQDestination[] durableDestinations; 109 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 110 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 111 protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; 112 protected CountDownLatch startedLatch = new CountDownLatch(2); 113 protected CountDownLatch localStartedLatch = new CountDownLatch(1); 114 protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); 115 protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1); 116 protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); 117 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 118 protected NetworkBridgeConfiguration configuration; 119 120 final AtomicLong enqueueCounter = new AtomicLong(); 121 final AtomicLong dequeueCounter = new AtomicLong(); 122 123 private NetworkBridgeListener networkBridgeListener; 124 private boolean createdByDuplex; 125 private BrokerInfo localBrokerInfo; 126 private BrokerInfo remoteBrokerInfo; 127 128 private final AtomicBoolean started = new AtomicBoolean(); 129 private TransportConnection duplexInitiatingConnection; 130 private BrokerService brokerService = null; 131 132 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 133 this.configuration = configuration; 134 this.localBroker = localBroker; 135 this.remoteBroker = remoteBroker; 136 } 137 138 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 139 this.localBrokerInfo = localBrokerInfo; 140 this.remoteBrokerInfo = remoteBrokerInfo; 141 this.duplexInitiatingConnection = connection; 142 start(); 143 serviceRemoteCommand(remoteBrokerInfo); 144 } 145 146 public void start() throws Exception { 147 if (started.compareAndSet(false, true)) { 148 localBroker.setTransportListener(new DefaultTransportListener() { 149 150 @Override 151 public void onCommand(Object o) { 152 Command command = (Command) o; 153 serviceLocalCommand(command); 154 } 155 156 @Override 157 public void onException(IOException error) { 158 serviceLocalException(error); 159 } 160 }); 161 remoteBroker.setTransportListener(new TransportListener() { 162 163 public void onCommand(Object o) { 164 Command command = (Command) o; 165 serviceRemoteCommand(command); 166 } 167 168 public void onException(IOException error) { 169 serviceRemoteException(error); 170 } 171 172 public void transportInterupted() { 173 // clear any subscriptions - to try and prevent the bridge 174 // from stalling the broker 175 if (remoteInterupted.compareAndSet(false, true)) { 176 LOG.info("Outbound transport to " + remoteBrokerName + " interrupted."); 177 if (localBridgeStarted.get()) { 178 clearDownSubscriptions(); 179 synchronized (DemandForwardingBridgeSupport.this) { 180 try { 181 localBroker.oneway(localConnectionInfo.createRemoveCommand()); 182 } catch (TransportDisposedIOException td) { 183 LOG.debug("local broker is now disposed", td); 184 } catch (IOException e) { 185 LOG.warn("Caught exception from local start", e); 186 } 187 } 188 } 189 localBridgeStarted.set(false); 190 remoteBridgeStarted.set(false); 191 startedLatch = new CountDownLatch(2); 192 localStartedLatch = new CountDownLatch(1); 193 } 194 } 195 196 public void transportResumed() { 197 if (remoteInterupted.compareAndSet(true, false)) { 198 // We want to slow down false connects so that we don't 199 // get in a busy loop. 200 // False connects can occurr if you using SSH tunnels. 201 if (!lastConnectSucceeded.get()) { 202 try { 203 LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); 204 Thread.sleep(1000); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 } 208 } 209 lastConnectSucceeded.set(false); 210 try { 211 startLocalBridge(); 212 remoteBridgeStarted.set(true); 213 startedLatch.countDown(); 214 LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); 215 } catch (Throwable e) { 216 LOG.error("Caught exception from local start in resume transport", e); 217 serviceLocalException(e); 218 } 219 } 220 } 221 }); 222 223 localBroker.start(); 224 remoteBroker.start(); 225 if (!disposed.get()) { 226 try { 227 triggerRemoteStartBridge(); 228 } catch (IOException e) { 229 LOG.warn("Caught exception from remote start", e); 230 } 231 } else { 232 LOG.warn ("Bridge was disposed before the start() method was fully executed."); 233 throw new TransportDisposedIOException(); 234 } 235 } 236 } 237 238 protected void triggerLocalStartBridge() throws IOException { 239 final Map context = MDCHelper.getCopyOfContextMap(); 240 asyncTaskRunner.execute(new Runnable() { 241 public void run() { 242 MDCHelper.setContextMap(context); 243 final String originalName = Thread.currentThread().getName(); 244 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); 245 try { 246 startLocalBridge(); 247 } catch (Throwable e) { 248 serviceLocalException(e); 249 } finally { 250 Thread.currentThread().setName(originalName); 251 } 252 } 253 }); 254 } 255 256 protected void triggerRemoteStartBridge() throws IOException { 257 final Map context = MDCHelper.getCopyOfContextMap(); 258 asyncTaskRunner.execute(new Runnable() { 259 public void run() { 260 MDCHelper.setContextMap(context); 261 final String originalName = Thread.currentThread().getName(); 262 Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); 263 try { 264 startRemoteBridge(); 265 } catch (Exception e) { 266 serviceRemoteException(e); 267 } finally { 268 Thread.currentThread().setName(originalName); 269 } 270 } 271 }); 272 } 273 274 protected void startLocalBridge() throws Throwable { 275 if (localBridgeStarted.compareAndSet(false, true)) { 276 synchronized (this) { 277 if (LOG.isTraceEnabled()) { 278 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker); 279 } 280 remoteBrokerNameKnownLatch.await(); 281 282 if (!disposed.get()) { 283 localConnectionInfo = new ConnectionInfo(); 284 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 285 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 286 localConnectionInfo.setClientId(localClientId); 287 localConnectionInfo.setUserName(configuration.getUserName()); 288 localConnectionInfo.setPassword(configuration.getPassword()); 289 Transport originalTransport = remoteBroker; 290 while (originalTransport instanceof TransportFilter) { 291 originalTransport = ((TransportFilter) originalTransport).getNext(); 292 } 293 if (originalTransport instanceof SslTransport) { 294 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 295 localConnectionInfo.setTransportContext(peerCerts); 296 } 297 // sync requests that may fail 298 Object resp = localBroker.request(localConnectionInfo); 299 if (resp instanceof ExceptionResponse) { 300 throw ((ExceptionResponse)resp).getException(); 301 } 302 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 303 localBroker.oneway(localSessionInfo); 304 305 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); 306 NetworkBridgeListener l = this.networkBridgeListener; 307 if (l != null) { 308 l.onStart(this); 309 } 310 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); 311 312 } else { 313 LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); 314 } 315 startedLatch.countDown(); 316 localStartedLatch.countDown(); 317 if (!disposed.get()) { 318 setupStaticDestinations(); 319 } else { 320 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); 321 } 322 } 323 } 324 } 325 326 protected void startRemoteBridge() throws Exception { 327 if (remoteBridgeStarted.compareAndSet(false, true)) { 328 if (LOG.isTraceEnabled()) { 329 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker); 330 } 331 synchronized (this) { 332 if (!isCreatedByDuplex()) { 333 BrokerInfo brokerInfo = new BrokerInfo(); 334 brokerInfo.setBrokerName(configuration.getBrokerName()); 335 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 336 brokerInfo.setNetworkConnection(true); 337 brokerInfo.setDuplexConnection(configuration.isDuplex()); 338 // set our properties 339 Properties props = new Properties(); 340 IntrospectionSupport.getProperties(configuration, props, null); 341 String str = MarshallingSupport.propertiesToString(props); 342 brokerInfo.setNetworkProperties(str); 343 brokerInfo.setBrokerId(this.localBrokerId); 344 remoteBroker.oneway(brokerInfo); 345 } 346 if (remoteConnectionInfo != null) { 347 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 348 } 349 remoteConnectionInfo = new ConnectionInfo(); 350 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 351 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 352 remoteConnectionInfo.setUserName(configuration.getUserName()); 353 remoteConnectionInfo.setPassword(configuration.getPassword()); 354 remoteBroker.oneway(remoteConnectionInfo); 355 356 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 357 remoteBroker.oneway(remoteSessionInfo); 358 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 359 producerInfo.setResponseRequired(false); 360 remoteBroker.oneway(producerInfo); 361 // Listen to consumer advisory messages on the remote broker to 362 // determine demand. 363 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 364 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); 365 String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter(); 366 if (configuration.isBridgeTempDestinations()) { 367 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 368 } 369 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 370 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 371 remoteBroker.oneway(demandConsumerInfo); 372 startedLatch.countDown(); 373 if (!disposed.get()) { 374 triggerLocalStartBridge(); 375 } 376 } 377 } 378 } 379 380 public void stop() throws Exception { 381 if (started.compareAndSet(true, false)) { 382 if (disposed.compareAndSet(false, true)) { 383 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); 384 NetworkBridgeListener l = this.networkBridgeListener; 385 if (l != null) { 386 l.onStop(this); 387 } 388 try { 389 remoteBridgeStarted.set(false); 390 final CountDownLatch sendShutdown = new CountDownLatch(1); 391 final Map map = MDCHelper.getCopyOfContextMap(); 392 asyncTaskRunner.execute(new Runnable() { 393 public void run() { 394 try { 395 MDCHelper.setContextMap(map); 396 localBroker.oneway(new ShutdownInfo()); 397 sendShutdown.countDown(); 398 remoteBroker.oneway(new ShutdownInfo()); 399 } catch (Throwable e) { 400 LOG.debug("Caught exception sending shutdown", e); 401 } finally { 402 sendShutdown.countDown(); 403 } 404 405 } 406 }); 407 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 408 LOG.info("Network Could not shutdown in a timely manner"); 409 } 410 } finally { 411 ServiceStopper ss = new ServiceStopper(); 412 ss.stop(remoteBroker); 413 ss.stop(localBroker); 414 // Release the started Latch since another thread could be 415 // stuck waiting for it to start up. 416 startedLatch.countDown(); 417 startedLatch.countDown(); 418 localStartedLatch.countDown(); 419 ss.throwFirstException(); 420 } 421 } 422 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 423 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 424 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); 425 remoteBrokerNameKnownLatch.countDown(); 426 } 427 } 428 429 public void serviceRemoteException(Throwable error) { 430 if (!disposed.get()) { 431 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 432 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 433 } else { 434 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 435 } 436 LOG.debug("The remote Exception was: " + error, error); 437 final Map map = MDCHelper.getCopyOfContextMap(); 438 asyncTaskRunner.execute(new Runnable() { 439 public void run() { 440 MDCHelper.setContextMap(map); 441 ServiceSupport.dispose(getControllingService()); 442 } 443 }); 444 fireBridgeFailed(); 445 } 446 } 447 448 protected void serviceRemoteCommand(Command command) { 449 if (!disposed.get()) { 450 try { 451 if (command.isMessageDispatch()) { 452 waitStarted(); 453 MessageDispatch md = (MessageDispatch) command; 454 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 455 demandConsumerDispatched++; 456 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 457 remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched)); 458 demandConsumerDispatched = 0; 459 } 460 } else if (command.isBrokerInfo()) { 461 lastConnectSucceeded.set(true); 462 remoteBrokerInfo = (BrokerInfo) command; 463 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 464 try { 465 IntrospectionSupport.getProperties(configuration, props, null); 466 if (configuration.getExcludedDestinations() != null) { 467 excludedDestinations = configuration.getExcludedDestinations().toArray( 468 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 469 } 470 if (configuration.getStaticallyIncludedDestinations() != null) { 471 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 472 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 473 } 474 if (configuration.getDynamicallyIncludedDestinations() != null) { 475 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() 476 .toArray( 477 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations() 478 .size()]); 479 } 480 } catch (Throwable t) { 481 LOG.error("Error mapping remote destinations", t); 482 } 483 serviceRemoteBrokerInfo(command); 484 // Let the local broker know the remote broker's ID. 485 localBroker.oneway(command); 486 // new peer broker (a consumer can work with remote broker also) 487 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 488 } else if (command.getClass() == ConnectionError.class) { 489 ConnectionError ce = (ConnectionError) command; 490 serviceRemoteException(ce.getException()); 491 } else { 492 if (isDuplex()) { 493 if (command.isMessage()) { 494 ActiveMQMessage message = (ActiveMQMessage) command; 495 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 496 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { 497 serviceRemoteConsumerAdvisory(message.getDataStructure()); 498 } else { 499 if (!isPermissableDestination(message.getDestination(), true)) { 500 return; 501 } 502 if (message.isResponseRequired()) { 503 Response reply = new Response(); 504 reply.setCorrelationId(message.getCommandId()); 505 localBroker.oneway(message); 506 remoteBroker.oneway(reply); 507 } else { 508 localBroker.oneway(message); 509 } 510 } 511 } else { 512 switch (command.getDataStructureType()) { 513 case ConnectionInfo.DATA_STRUCTURE_TYPE: 514 case SessionInfo.DATA_STRUCTURE_TYPE: 515 case ProducerInfo.DATA_STRUCTURE_TYPE: 516 localBroker.oneway(command); 517 break; 518 case ConsumerInfo.DATA_STRUCTURE_TYPE: 519 localStartedLatch.await(); 520 if (started.get()) { 521 if (!addConsumerInfo((ConsumerInfo) command)) { 522 if (LOG.isDebugEnabled()) { 523 LOG.debug("Ignoring ConsumerInfo: " + command); 524 } 525 } else { 526 if (LOG.isTraceEnabled()) { 527 LOG.trace("Adding ConsumerInfo: " + command); 528 } 529 } 530 } else { 531 // received a subscription whilst stopping 532 LOG.warn("Stopping - ignoring ConsumerInfo: " + command); 533 } 534 break; 535 case ShutdownInfo.DATA_STRUCTURE_TYPE: 536 // initiator is shutting down, controlled case 537 // abortive close dealt with by inactivity monitor 538 LOG.info("Stopping network bridge on shutdown of remote broker"); 539 serviceRemoteException(new IOException(command.toString())); 540 break; 541 default: 542 if (LOG.isDebugEnabled()) { 543 LOG.debug("Ignoring remote command: " + command); 544 } 545 } 546 } 547 } else { 548 switch (command.getDataStructureType()) { 549 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 550 case WireFormatInfo.DATA_STRUCTURE_TYPE: 551 case ShutdownInfo.DATA_STRUCTURE_TYPE: 552 break; 553 default: 554 LOG.warn("Unexpected remote command: " + command); 555 } 556 } 557 } 558 } catch (Throwable e) { 559 if (LOG.isDebugEnabled()) { 560 LOG.debug("Exception processing remote command: " + command, e); 561 } 562 serviceRemoteException(e); 563 } 564 } 565 } 566 567 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 568 final int networkTTL = configuration.getNetworkTTL(); 569 if (data.getClass() == ConsumerInfo.class) { 570 // Create a new local subscription 571 ConsumerInfo info = (ConsumerInfo) data; 572 BrokerId[] path = info.getBrokerPath(); 573 574 if (info.isBrowser()) { 575 if (LOG.isDebugEnabled()) { 576 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed"); 577 } 578 return; 579 } 580 581 if (path != null && path.length >= networkTTL) { 582 if (LOG.isDebugEnabled()) { 583 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); 584 } 585 return; 586 } 587 if (contains(path, localBrokerPath[0])) { 588 // Ignore this consumer as it's a consumer we locally sent to the broker. 589 if (LOG.isDebugEnabled()) { 590 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); 591 } 592 return; 593 } 594 if (!isPermissableDestination(info.getDestination())) { 595 // ignore if not in the permitted or in the excluded list 596 if (LOG.isDebugEnabled()) { 597 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); 598 } 599 return; 600 } 601 602 // in a cyclic network there can be multiple bridges per broker that can propagate 603 // a network subscription so there is a need to synchronise on a shared entity 604 synchronized (brokerService.getVmConnectorURI()) { 605 if (addConsumerInfo(info)) { 606 if (LOG.isDebugEnabled()) { 607 LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info); 608 } 609 } else { 610 if (LOG.isDebugEnabled()) { 611 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); 612 } 613 } 614 } 615 } else if (data.getClass() == DestinationInfo.class) { 616 // It's a destination info - we want to pass up 617 // information about temporary destinations 618 DestinationInfo destInfo = (DestinationInfo) data; 619 BrokerId[] path = destInfo.getBrokerPath(); 620 if (path != null && path.length >= networkTTL) { 621 if (LOG.isDebugEnabled()) { 622 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only"); 623 } 624 return; 625 } 626 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 627 // Ignore this consumer as it's a consumer we locally sent to 628 // the broker. 629 if (LOG.isDebugEnabled()) { 630 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); 631 } 632 return; 633 } 634 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 635 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 636 // re-set connection id so comes from here 637 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 638 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 639 } 640 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 641 if (LOG.isTraceEnabled()) { 642 LOG.trace("bridging destination control command: " + destInfo); 643 } 644 localBroker.oneway(destInfo); 645 } else if (data.getClass() == RemoveInfo.class) { 646 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 647 removeDemandSubscription(id); 648 } 649 } 650 651 public void serviceLocalException(Throwable error) { 652 if (!disposed.get()) { 653 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); 654 LOG.debug("The local Exception was:" + error, error); 655 final Map map = MDCHelper.getCopyOfContextMap(); 656 asyncTaskRunner.execute(new Runnable() { 657 public void run() { 658 MDCHelper.setContextMap(map); 659 ServiceSupport.dispose(getControllingService()); 660 } 661 }); 662 fireBridgeFailed(); 663 } 664 } 665 666 protected Service getControllingService() { 667 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 668 } 669 670 protected void addSubscription(DemandSubscription sub) throws IOException { 671 if (sub != null) { 672 localBroker.oneway(sub.getLocalInfo()); 673 } 674 } 675 676 protected void removeSubscription(final DemandSubscription sub) throws IOException { 677 if (sub != null) { 678 if (LOG.isDebugEnabled()) { 679 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); 680 } 681 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 682 683 // continue removal in separate thread to free up this thread for outstanding responses 684 final Map map = MDCHelper.getCopyOfContextMap(); 685 asyncTaskRunner.execute(new Runnable() { 686 public void run() { 687 MDCHelper.setContextMap(map); 688 sub.waitForCompletion(); 689 try { 690 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 691 } catch (IOException e) { 692 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); 693 } 694 } 695 }); 696 } 697 } 698 699 protected Message configureMessage(MessageDispatch md) { 700 Message message = md.getMessage().copy(); 701 // Update the packet to show where it came from. 702 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 703 message.setProducerId(producerInfo.getProducerId()); 704 message.setDestination(md.getDestination()); 705 if (message.getOriginalTransactionId() == null) { 706 message.setOriginalTransactionId(message.getTransactionId()); 707 } 708 message.setTransactionId(null); 709 return message; 710 } 711 712 protected void serviceLocalCommand(Command command) { 713 if (!disposed.get()) { 714 try { 715 if (command.isMessageDispatch()) { 716 enqueueCounter.incrementAndGet(); 717 final MessageDispatch md = (MessageDispatch) command; 718 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 719 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 720 721 if (suppressMessageDispatch(md, sub)) { 722 if (LOG.isDebugEnabled()) { 723 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); 724 } 725 // still ack as it may be durable 726 try { 727 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 728 } finally { 729 sub.decrementOutstandingResponses(); 730 } 731 return; 732 } 733 734 Message message = configureMessage(md); 735 if (LOG.isDebugEnabled()) { 736 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); 737 } 738 739 if (!message.isResponseRequired()) { 740 741 // If the message was originally sent using async 742 // send, we will preserve that QOS 743 // by bridging it using an async send (small chance 744 // of message loss). 745 try { 746 remoteBroker.oneway(message); 747 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 748 dequeueCounter.incrementAndGet(); 749 } finally { 750 sub.decrementOutstandingResponses(); 751 } 752 753 } else { 754 755 // The message was not sent using async send, so we 756 // should only ack the local 757 // broker when we get confirmation that the remote 758 // broker has received the message. 759 ResponseCallback callback = new ResponseCallback() { 760 public void onCompletion(FutureResponse future) { 761 try { 762 Response response = future.getResult(); 763 if (response.isException()) { 764 ExceptionResponse er = (ExceptionResponse) response; 765 serviceLocalException(er.getException()); 766 } else { 767 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 768 dequeueCounter.incrementAndGet(); 769 } 770 } catch (IOException e) { 771 serviceLocalException(e); 772 } finally { 773 sub.decrementOutstandingResponses(); 774 } 775 } 776 }; 777 778 remoteBroker.asyncRequest(message, callback); 779 780 } 781 } else { 782 if (LOG.isDebugEnabled()) { 783 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); 784 } 785 } 786 } else if (command.isBrokerInfo()) { 787 localBrokerInfo = (BrokerInfo) command; 788 serviceLocalBrokerInfo(command); 789 } else if (command.isShutdownInfo()) { 790 LOG.info(configuration.getBrokerName() + " Shutting down"); 791 // Don't shut down the whole connector if the remote side 792 // was interrupted. 793 // the local transport is just shutting down temporarily 794 // until the remote side 795 // is restored. 796 if (!remoteInterupted.get()) { 797 stop(); 798 } 799 } else if (command.getClass() == ConnectionError.class) { 800 ConnectionError ce = (ConnectionError) command; 801 serviceLocalException(ce.getException()); 802 } else { 803 switch (command.getDataStructureType()) { 804 case WireFormatInfo.DATA_STRUCTURE_TYPE: 805 break; 806 default: 807 LOG.warn("Unexpected local command: " + command); 808 } 809 } 810 } catch (Throwable e) { 811 LOG.warn("Caught an exception processing local command", e); 812 serviceLocalException(e); 813 } 814 } 815 } 816 817 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 818 // See if this consumer's brokerPath tells us it came from the broker at the other end 819 // of the bridge. I think we should be making this decision based on the message's 820 // broker bread crumbs and not the consumer's? However, the message's broker bread 821 // crumbs are null, which is another matter. 822 boolean suppress = false; 823 Object consumerInfo = md.getMessage().getDataStructure(); 824 if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) { 825 suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); 826 } 827 828 // for durable subs, suppression via filter leaves dangling acks so we need to 829 // check here and allow the ack irrespective 830 if (!suppress && sub.getLocalInfo().isDurable()) { 831 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 832 messageEvalContext.setMessageReference(md.getMessage()); 833 suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext); 834 } 835 return suppress; 836 } 837 838 /** 839 * @return Returns the dynamicallyIncludedDestinations. 840 */ 841 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 842 return dynamicallyIncludedDestinations; 843 } 844 845 /** 846 * @param dynamicallyIncludedDestinations The 847 * dynamicallyIncludedDestinations to set. 848 */ 849 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 850 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 851 } 852 853 /** 854 * @return Returns the excludedDestinations. 855 */ 856 public ActiveMQDestination[] getExcludedDestinations() { 857 return excludedDestinations; 858 } 859 860 /** 861 * @param excludedDestinations The excludedDestinations to set. 862 */ 863 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 864 this.excludedDestinations = excludedDestinations; 865 } 866 867 /** 868 * @return Returns the staticallyIncludedDestinations. 869 */ 870 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 871 return staticallyIncludedDestinations; 872 } 873 874 /** 875 * @param staticallyIncludedDestinations The staticallyIncludedDestinations 876 * to set. 877 */ 878 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 879 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 880 } 881 882 /** 883 * @return Returns the durableDestinations. 884 */ 885 public ActiveMQDestination[] getDurableDestinations() { 886 return durableDestinations; 887 } 888 889 /** 890 * @param durableDestinations The durableDestinations to set. 891 */ 892 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 893 this.durableDestinations = durableDestinations; 894 } 895 896 /** 897 * @return Returns the localBroker. 898 */ 899 public Transport getLocalBroker() { 900 return localBroker; 901 } 902 903 /** 904 * @return Returns the remoteBroker. 905 */ 906 public Transport getRemoteBroker() { 907 return remoteBroker; 908 } 909 910 /** 911 * @return the createdByDuplex 912 */ 913 public boolean isCreatedByDuplex() { 914 return this.createdByDuplex; 915 } 916 917 /** 918 * @param createdByDuplex the createdByDuplex to set 919 */ 920 public void setCreatedByDuplex(boolean createdByDuplex) { 921 this.createdByDuplex = createdByDuplex; 922 } 923 924 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 925 if (brokerPath != null) { 926 for (int i = 0; i < brokerPath.length; i++) { 927 if (brokerId.equals(brokerPath[i])) { 928 return true; 929 } 930 } 931 } 932 return false; 933 } 934 935 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 936 if (brokerPath == null || brokerPath.length == 0) { 937 return pathsToAppend; 938 } 939 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 940 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 941 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 942 return rc; 943 } 944 945 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 946 if (brokerPath == null || brokerPath.length == 0) { 947 return new BrokerId[] { idToAppend }; 948 } 949 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 950 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 951 rc[brokerPath.length] = idToAppend; 952 return rc; 953 } 954 955 protected boolean isPermissableDestination(ActiveMQDestination destination) { 956 return isPermissableDestination(destination, false); 957 } 958 959 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 960 // Are we not bridging temp destinations? 961 if (destination.isTemporary()) { 962 if (allowTemporary) { 963 return true; 964 } else { 965 return configuration.isBridgeTempDestinations(); 966 } 967 } 968 969 ActiveMQDestination[] dests = excludedDestinations; 970 if (dests != null && dests.length > 0) { 971 for (int i = 0; i < dests.length; i++) { 972 ActiveMQDestination match = dests[i]; 973 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match); 974 if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { 975 return false; 976 } 977 } 978 } 979 980 dests = dynamicallyIncludedDestinations; 981 if (dests != null && dests.length > 0) { 982 for (int i = 0; i < dests.length; i++) { 983 ActiveMQDestination match = dests[i]; 984 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); 985 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { 986 return true; 987 } 988 } 989 990 return false; 991 } 992 return true; 993 } 994 995 /** 996 * Subscriptions for these destinations are always created 997 */ 998 protected void setupStaticDestinations() { 999 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1000 if (dests != null) { 1001 for (int i = 0; i < dests.length; i++) { 1002 ActiveMQDestination dest = dests[i]; 1003 DemandSubscription sub = createDemandSubscription(dest); 1004 try { 1005 addSubscription(sub); 1006 } catch (IOException e) { 1007 LOG.error("Failed to add static destination " + dest, e); 1008 } 1009 if (LOG.isTraceEnabled()) { 1010 LOG.trace("bridging messages for static destination: " + dest); 1011 } 1012 } 1013 } 1014 } 1015 1016 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1017 boolean consumerAdded = false; 1018 ConsumerInfo info = consumerInfo.copy(); 1019 addRemoteBrokerToBrokerPath(info); 1020 DemandSubscription sub = createDemandSubscription(info); 1021 if (sub != null) { 1022 if (duplicateSuppressionIsRequired(sub)) { 1023 undoMapRegistration(sub); 1024 } else { 1025 addSubscription(sub); 1026 consumerAdded = true; 1027 } 1028 } 1029 return consumerAdded; 1030 } 1031 1032 private void undoMapRegistration(DemandSubscription sub) { 1033 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1034 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1035 } 1036 1037 /* 1038 * check our existing subs networkConsumerIds against the list of network ids in this subscription 1039 * A match means a duplicate which we suppress for topics and maybe for queues 1040 */ 1041 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1042 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1043 boolean suppress = false; 1044 1045 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || 1046 consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { 1047 return suppress; 1048 } 1049 1050 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1051 Collection<Subscription> currentSubs = 1052 getRegionSubscriptions(consumerInfo.getDestination().isTopic()); 1053 for (Subscription sub : currentSubs) { 1054 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1055 if (!networkConsumers.isEmpty()) { 1056 if (matchFound(candidateConsumers, networkConsumers)) { 1057 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1058 break; 1059 } 1060 } 1061 } 1062 return suppress; 1063 } 1064 1065 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1066 boolean suppress = false; 1067 1068 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1069 if (LOG.isDebugEnabled()) { 1070 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName 1071 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 1072 + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); 1073 } 1074 suppress = true; 1075 } else { 1076 // remove the existing lower priority duplicate and allow this candidate 1077 try { 1078 removeDuplicateSubscription(existingSub); 1079 1080 if (LOG.isDebugEnabled()) { 1081 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() 1082 + " with sub from " + remoteBrokerName 1083 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 1084 + candidateInfo.getNetworkConsumerIds()); 1085 } 1086 } catch (IOException e) { 1087 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); 1088 } 1089 } 1090 return suppress; 1091 } 1092 1093 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1094 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1095 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1096 break; 1097 } 1098 } 1099 } 1100 1101 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1102 boolean found = false; 1103 for (ConsumerId aliasConsumer : networkConsumers) { 1104 if (candidateConsumers.contains(aliasConsumer)) { 1105 found = true; 1106 break; 1107 } 1108 } 1109 return found; 1110 } 1111 1112 private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) { 1113 RegionBroker region = (RegionBroker) brokerService.getRegionBroker(); 1114 AbstractRegion abstractRegion = (AbstractRegion) 1115 (isTopic ? region.getTopicRegion() : region.getQueueRegion()); 1116 return abstractRegion.getSubscriptions().values(); 1117 } 1118 1119 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1120 //add our original id to ourselves 1121 info.addNetworkConsumerId(info.getConsumerId()); 1122 return doCreateDemandSubscription(info); 1123 } 1124 1125 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1126 DemandSubscription result = new DemandSubscription(info); 1127 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1128 if (info.getDestination().isTemporary()) { 1129 // reset the local connection Id 1130 1131 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1132 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1133 } 1134 1135 if (configuration.isDecreaseNetworkConsumerPriority()) { 1136 byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; 1137 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1138 // The longer the path to the consumer, the less it's consumer priority. 1139 priority -= info.getBrokerPath().length + 1; 1140 } 1141 result.getLocalInfo().setPriority(priority); 1142 if (LOG.isDebugEnabled()) { 1143 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); 1144 } 1145 } 1146 configureDemandSubscription(info, result); 1147 return result; 1148 } 1149 1150 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1151 ConsumerInfo info = new ConsumerInfo(); 1152 info.setDestination(destination); 1153 // the remote info held by the DemandSubscription holds the original 1154 // consumerId, 1155 // the local info get's overwritten 1156 1157 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1158 DemandSubscription result = null; 1159 try { 1160 result = createDemandSubscription(info); 1161 } catch (IOException e) { 1162 LOG.error("Failed to create DemandSubscription ", e); 1163 } 1164 if (result != null) { 1165 result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 1166 } 1167 return result; 1168 } 1169 1170 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1171 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1172 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1173 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1174 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1175 1176 if (!info.isDurable()) { 1177 // This works for now since we use a VM connection to the local broker. 1178 // may need to change if we ever subscribe to a remote broker. 1179 sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); 1180 } else { 1181 // need to ack this message if it is ignored as it is durable so 1182 // we check before we send. see: suppressMessageDispatch() 1183 } 1184 } 1185 1186 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1187 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1188 if (LOG.isDebugEnabled()) { 1189 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); 1190 } 1191 if (sub != null) { 1192 removeSubscription(sub); 1193 if (LOG.isDebugEnabled()) { 1194 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo()); 1195 } 1196 } 1197 } 1198 1199 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1200 boolean removeDone = false; 1201 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1202 if (sub != null) { 1203 try { 1204 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1205 removeDone = true; 1206 } catch (IOException e) { 1207 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); 1208 } 1209 } 1210 return removeDone; 1211 } 1212 1213 protected void waitStarted() throws InterruptedException { 1214 startedLatch.await(); 1215 localBrokerIdKnownLatch.await(); 1216 } 1217 1218 protected void clearDownSubscriptions() { 1219 subscriptionMapByLocalId.clear(); 1220 subscriptionMapByRemoteId.clear(); 1221 } 1222 1223 protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException; 1224 1225 protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; 1226 1227 protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; 1228 1229 protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; 1230 1231 protected abstract BrokerId[] getRemoteBrokerPath(); 1232 1233 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1234 this.networkBridgeListener = listener; 1235 } 1236 1237 private void fireBridgeFailed() { 1238 NetworkBridgeListener l = this.networkBridgeListener; 1239 if (l != null) { 1240 l.bridgeFailed(); 1241 } 1242 } 1243 1244 public String getRemoteAddress() { 1245 return remoteBroker.getRemoteAddress(); 1246 } 1247 1248 public String getLocalAddress() { 1249 return localBroker.getRemoteAddress(); 1250 } 1251 1252 public String getRemoteBrokerName() { 1253 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1254 } 1255 1256 public String getLocalBrokerName() { 1257 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1258 } 1259 1260 public long getDequeueCounter() { 1261 return dequeueCounter.get(); 1262 } 1263 1264 public long getEnqueueCounter() { 1265 return enqueueCounter.get(); 1266 } 1267 1268 protected boolean isDuplex() { 1269 return configuration.isDuplex() || createdByDuplex; 1270 } 1271 1272 public void setBrokerService(BrokerService brokerService) { 1273 this.brokerService = brokerService; 1274 } 1275 }