001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.Iterator; 023 import java.util.List; 024 import org.apache.activemq.command.BrokerId; 025 import org.apache.activemq.command.ConsumerId; 026 import org.apache.activemq.command.ConsumerInfo; 027 import org.apache.activemq.filter.DestinationFilter; 028 import org.apache.activemq.transport.Transport; 029 import org.slf4j.Logger; 030 import org.slf4j.LoggerFactory; 031 032 /** 033 * Consolidates subscriptions 034 * 035 * 036 */ 037 public class ConduitBridge extends DemandForwardingBridge { 038 private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class); 039 040 /** 041 * Constructor 042 * 043 * @param localBroker 044 * @param remoteBroker 045 */ 046 public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 047 super(configuration, localBroker, remoteBroker); 048 } 049 050 @Override 051 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 052 if (addToAlreadyInterestedConsumers(info)) { 053 return null; // don't want this subscription added 054 } 055 //add our original id to ourselves 056 info.addNetworkConsumerId(info.getConsumerId()); 057 info.setSelector(null); 058 return doCreateDemandSubscription(info); 059 } 060 061 protected boolean checkPaths(BrokerId[] first, BrokerId[] second) { 062 if (first == null || second == null) 063 return true; 064 if (Arrays.equals(first, second)) 065 return true; 066 if (first[0].equals(second[0]) 067 && first[first.length - 1].equals(second[second.length - 1])) 068 return false; 069 else 070 return true; 071 } 072 073 protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { 074 // search through existing subscriptions and see if we have a match 075 boolean matched = false; 076 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { 077 DemandSubscription ds = (DemandSubscription)i.next(); 078 DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination()); 079 if (filter.matches(info.getDestination())) { 080 if (LOG.isDebugEnabled()) { 081 LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo() 082 + " with sub: " + info.getConsumerId()); 083 } 084 // add the interest in the subscription 085 // ds.add(ds.getRemoteInfo().getConsumerId()); 086 if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) { 087 ds.add(info.getConsumerId()); 088 } 089 matched = true; 090 // continue - we want interest to any existing 091 // DemandSubscriptions 092 } 093 } 094 return matched; 095 } 096 097 @Override 098 protected void removeDemandSubscription(ConsumerId id) throws IOException { 099 List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>(); 100 101 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { 102 DemandSubscription ds = (DemandSubscription)i.next(); 103 if (ds.remove(id)) { 104 if (LOG.isDebugEnabled()) { 105 LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo()); 106 } 107 } 108 if (ds.isEmpty()) { 109 tmpList.add(ds); 110 } 111 } 112 for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) { 113 DemandSubscription ds = i.next(); 114 removeSubscription(ds); 115 if (LOG.isDebugEnabled()) { 116 LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); 117 } 118 } 119 120 } 121 122 }