001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.discovery; 018 019 import java.net.URI; 020 import java.net.URISyntaxException; 021 import java.util.Map; 022 import java.util.concurrent.ConcurrentHashMap; 023 import org.apache.activemq.command.DiscoveryEvent; 024 import org.apache.activemq.transport.CompositeTransport; 025 import org.apache.activemq.transport.TransportFilter; 026 import org.apache.activemq.util.ServiceStopper; 027 import org.apache.activemq.util.URISupport; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to 033 * discover remote broker instances and dynamically connect to them. 034 * 035 * 036 */ 037 public class DiscoveryTransport extends TransportFilter implements DiscoveryListener { 038 039 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class); 040 041 private final CompositeTransport next; 042 private DiscoveryAgent discoveryAgent; 043 private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>(); 044 045 private Map<String, String> parameters; 046 047 public DiscoveryTransport(CompositeTransport next) { 048 super(next); 049 this.next = next; 050 } 051 052 @Override 053 public void start() throws Exception { 054 if (discoveryAgent == null) { 055 throw new IllegalStateException("discoveryAgent not configured"); 056 } 057 058 // lets pass into the agent the broker name and connection details 059 discoveryAgent.setDiscoveryListener(this); 060 discoveryAgent.start(); 061 next.start(); 062 } 063 064 @Override 065 public void stop() throws Exception { 066 ServiceStopper ss = new ServiceStopper(); 067 ss.stop(discoveryAgent); 068 ss.stop(next); 069 ss.throwFirstException(); 070 } 071 072 public void onServiceAdd(DiscoveryEvent event) { 073 String url = event.getServiceName(); 074 if (url != null) { 075 try { 076 URI uri = new URI(url); 077 LOG.info("Adding new broker connection URL: " + uri); 078 uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX); 079 serviceURIs.put(event.getServiceName(), uri); 080 next.add(false,new URI[] {uri}); 081 } catch (URISyntaxException e) { 082 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 083 } 084 } 085 } 086 087 public void onServiceRemove(DiscoveryEvent event) { 088 URI uri = serviceURIs.get(event.getServiceName()); 089 if (uri != null) { 090 next.remove(false,new URI[] {uri}); 091 } 092 } 093 094 public DiscoveryAgent getDiscoveryAgent() { 095 return discoveryAgent; 096 } 097 098 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 099 this.discoveryAgent = discoveryAgent; 100 } 101 102 public void setParameters(Map<String, String> parameters) { 103 this.parameters = parameters; 104 } 105 106 }