001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.fanout;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.command.ConsumerInfo;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.RemoveInfo;
031    import org.apache.activemq.command.Response;
032    import org.apache.activemq.state.ConnectionStateTracker;
033    import org.apache.activemq.thread.DefaultThreadPools;
034    import org.apache.activemq.thread.Task;
035    import org.apache.activemq.thread.TaskRunner;
036    import org.apache.activemq.transport.CompositeTransport;
037    import org.apache.activemq.transport.DefaultTransportListener;
038    import org.apache.activemq.transport.FutureResponse;
039    import org.apache.activemq.transport.ResponseCallback;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportFactory;
042    import org.apache.activemq.transport.TransportListener;
043    import org.apache.activemq.util.IOExceptionSupport;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.apache.activemq.util.ServiceSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A Transport that fans out a connection to multiple brokers.
051     * 
052     * 
053     */
054    public class FanoutTransport implements CompositeTransport {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057    
058        private TransportListener transportListener;
059        private boolean disposed;
060        private boolean connected;
061    
062        private final Object reconnectMutex = new Object();
063        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064        private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065    
066        private final TaskRunner reconnectTask;
067        private boolean started;
068    
069        private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
070        private int connectedCount;
071    
072        private int minAckCount = 2;
073    
074        private long initialReconnectDelay = 10;
075        private long maxReconnectDelay = 1000 * 30;
076        private long backOffMultiplier = 2;
077        private final boolean useExponentialBackOff = true;
078        private int maxReconnectAttempts;
079        private Exception connectionFailure;
080        private FanoutTransportHandler primary;
081        private boolean fanOutQueues = false;
082    
083        static class RequestCounter {
084    
085            final Command command;
086            final AtomicInteger ackCount;
087    
088            RequestCounter(Command command, int count) {
089                this.command = command;
090                this.ackCount = new AtomicInteger(count);
091            }
092    
093            @Override
094            public String toString() {
095                return command.getCommandId() + "=" + ackCount.get();
096            }
097        }
098    
099        class FanoutTransportHandler extends DefaultTransportListener {
100    
101            private final URI uri;
102            private Transport transport;
103    
104            private int connectFailures;
105            private long reconnectDelay = initialReconnectDelay;
106            private long reconnectDate;
107    
108            public FanoutTransportHandler(URI uri) {
109                this.uri = uri;
110            }
111    
112            @Override
113            public void onCommand(Object o) {
114                Command command = (Command)o;
115                if (command.isResponse()) {
116                    Integer id = new Integer(((Response)command).getCorrelationId());
117                    RequestCounter rc = requestMap.get(id);
118                    if (rc != null) {
119                        if (rc.ackCount.decrementAndGet() <= 0) {
120                            requestMap.remove(id);
121                            transportListenerOnCommand(command);
122                        }
123                    } else {
124                        transportListenerOnCommand(command);
125                    }
126                } else {
127                    transportListenerOnCommand(command);
128                }
129            }
130    
131            @Override
132            public void onException(IOException error) {
133                try {
134                    synchronized (reconnectMutex) {
135                        if (transport == null || !transport.isConnected()) {
136                            return;
137                        }
138    
139                        LOG.debug("Transport failed, starting up reconnect task", error);
140    
141                        ServiceSupport.dispose(transport);
142                        transport = null;
143                        connectedCount--;
144                        if (primary == this) {
145                            primary = null;
146                        }
147                        reconnectTask.wakeup();
148                    }
149                } catch (InterruptedException e) {
150                    Thread.currentThread().interrupt();
151                    if (transportListener != null) {
152                        transportListener.onException(new InterruptedIOException());
153                    }
154                }
155            }
156        }
157    
158        public FanoutTransport() throws InterruptedIOException {
159            // Setup a task that is used to reconnect the a connection async.
160            reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
161                public boolean iterate() {
162                    return doConnect();
163                }
164            }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
165        }
166    
167        /**
168         * @return
169         */
170        private boolean doConnect() {
171            long closestReconnectDate = 0;
172            synchronized (reconnectMutex) {
173    
174                if (disposed || connectionFailure != null) {
175                    reconnectMutex.notifyAll();
176                }
177    
178                if (transports.size() == connectedCount || disposed || connectionFailure != null) {
179                    return false;
180                } else {
181    
182                    if (transports.isEmpty()) {
183                        // connectionFailure = new IOException("No uris available to
184                        // connect to.");
185                    } else {
186    
187                        // Try to connect them up.
188                        Iterator<FanoutTransportHandler> iter = transports.iterator();
189                        for (int i = 0; iter.hasNext() && !disposed; i++) {
190    
191                            long now = System.currentTimeMillis();
192    
193                            FanoutTransportHandler fanoutHandler = iter.next();
194                            if (fanoutHandler.transport != null) {
195                                continue;
196                            }
197    
198                            // Are we waiting a little to try to reconnect this one?
199                            if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
200                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
201                                    closestReconnectDate = fanoutHandler.reconnectDate;
202                                }
203                                continue;
204                            }
205    
206                            URI uri = fanoutHandler.uri;
207                            try {
208                                LOG.debug("Stopped: " + this);
209                                LOG.debug("Attempting connect to: " + uri);
210                                Transport t = TransportFactory.compositeConnect(uri);
211                                fanoutHandler.transport = t;
212                                t.setTransportListener(fanoutHandler);
213                                if (started) {
214                                    restoreTransport(fanoutHandler);
215                                }
216                                LOG.debug("Connection established");
217                                fanoutHandler.reconnectDelay = initialReconnectDelay;
218                                fanoutHandler.connectFailures = 0;
219                                if (primary == null) {
220                                    primary = fanoutHandler;
221                                }
222                                connectedCount++;
223                            } catch (Exception e) {
224                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
225    
226                                if( fanoutHandler.transport !=null ) {
227                                    ServiceSupport.dispose(fanoutHandler.transport);
228                                    fanoutHandler.transport=null;
229                                }
230                                
231                                if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
232                                    LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
233                                    connectionFailure = e;
234                                    reconnectMutex.notifyAll();
235                                    return false;
236                                } else {
237    
238                                    if (useExponentialBackOff) {
239                                        // Exponential increment of reconnect delay.
240                                        fanoutHandler.reconnectDelay *= backOffMultiplier;
241                                        if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
242                                            fanoutHandler.reconnectDelay = maxReconnectDelay;
243                                        }
244                                    }
245    
246                                    fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
247    
248                                    if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
249                                        closestReconnectDate = fanoutHandler.reconnectDate;
250                                    }
251                                }
252                            }
253                        }
254                        if (transports.size() == connectedCount || disposed) {
255                            reconnectMutex.notifyAll();
256                            return false;
257                        }
258    
259                    }
260                }
261    
262            }
263    
264            try {
265                long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
266                if (reconnectDelay > 0) {
267                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
268                    Thread.sleep(reconnectDelay);
269                }
270            } catch (InterruptedException e1) {
271                Thread.currentThread().interrupt();
272            }
273            return true;
274        }
275    
276        public void start() throws Exception {
277            synchronized (reconnectMutex) {
278                LOG.debug("Started.");
279                if (started) {
280                    return;
281                }
282                started = true;
283                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
284                    FanoutTransportHandler th = iter.next();
285                    if (th.transport != null) {
286                        restoreTransport(th);
287                    }
288                }
289                connected=true;
290            }
291        }
292    
293        public void stop() throws Exception {
294            synchronized (reconnectMutex) {
295                ServiceStopper ss = new ServiceStopper();
296    
297                if (!started) {
298                    return;
299                }
300                started = false;
301                disposed = true;
302                connected=false;
303    
304                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
305                    FanoutTransportHandler th = iter.next();
306                    if (th.transport != null) {
307                        ss.stop(th.transport);
308                    }
309                }
310    
311                LOG.debug("Stopped: " + this);
312                ss.throwFirstException();
313            }
314            reconnectTask.shutdown();
315        }
316    
317            public int getMinAckCount() {
318                    return minAckCount;
319            }
320    
321            public void setMinAckCount(int minAckCount) {
322                    this.minAckCount = minAckCount;
323            }    
324        
325        public long getInitialReconnectDelay() {
326            return initialReconnectDelay;
327        }
328    
329        public void setInitialReconnectDelay(long initialReconnectDelay) {
330            this.initialReconnectDelay = initialReconnectDelay;
331        }
332    
333        public long getMaxReconnectDelay() {
334            return maxReconnectDelay;
335        }
336    
337        public void setMaxReconnectDelay(long maxReconnectDelay) {
338            this.maxReconnectDelay = maxReconnectDelay;
339        }
340    
341        public long getReconnectDelayExponent() {
342            return backOffMultiplier;
343        }
344    
345        public void setReconnectDelayExponent(long reconnectDelayExponent) {
346            this.backOffMultiplier = reconnectDelayExponent;
347        }
348    
349        public int getMaxReconnectAttempts() {
350            return maxReconnectAttempts;
351        }
352    
353        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
354            this.maxReconnectAttempts = maxReconnectAttempts;
355        }
356    
357        public void oneway(Object o) throws IOException {
358            final Command command = (Command)o;
359            try {
360                synchronized (reconnectMutex) {
361    
362                    // Wait for transport to be connected.
363                    while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
364                        LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
365                        reconnectMutex.wait(1000);
366                    }
367    
368                    // Still not fully connected.
369                    if (connectedCount < minAckCount) {
370    
371                        Exception error;
372    
373                        // Throw the right kind of error..
374                        if (disposed) {
375                            error = new IOException("Transport disposed.");
376                        } else if (connectionFailure != null) {
377                            error = connectionFailure;
378                        } else {
379                            error = new IOException("Unexpected failure.");
380                        }
381    
382                        if (error instanceof IOException) {
383                            throw (IOException)error;
384                        }
385                        throw IOExceptionSupport.create(error);
386                    }
387    
388                    // If it was a request and it was not being tracked by
389                    // the state tracker,
390                    // then hold it in the requestMap so that we can replay
391                    // it later.
392                    boolean fanout = isFanoutCommand(command);
393                    if (stateTracker.track(command) == null && command.isResponseRequired()) {
394                        int size = fanout ? minAckCount : 1;
395                        requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
396                    }
397                    
398                    // Send the message.
399                    if (fanout) {
400                        for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
401                            FanoutTransportHandler th = iter.next();
402                            if (th.transport != null) {
403                                try {
404                                    th.transport.oneway(command);
405                                } catch (IOException e) {
406                                    LOG.debug("Send attempt: failed.");
407                                    th.onException(e);
408                                }
409                            }
410                        }
411                    } else {
412                        try {
413                            primary.transport.oneway(command);
414                        } catch (IOException e) {
415                            LOG.debug("Send attempt: failed.");
416                            primary.onException(e);
417                        }
418                    }
419    
420                }
421            } catch (InterruptedException e) {
422                // Some one may be trying to stop our thread.
423                Thread.currentThread().interrupt();
424                throw new InterruptedIOException();
425            }
426        }
427    
428        /**
429         * @param command
430         * @return
431         */
432        private boolean isFanoutCommand(Command command) {
433            if (command.isMessage()) {
434                if( fanOutQueues ) {
435                    return true;
436                }
437                return ((Message)command).getDestination().isTopic();
438            }
439            if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
440                    command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
441                return false;
442            }
443            return true;
444        }
445    
446        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
447            throw new AssertionError("Unsupported Method");
448        }
449    
450        public Object request(Object command) throws IOException {
451            throw new AssertionError("Unsupported Method");
452        }
453    
454        public Object request(Object command, int timeout) throws IOException {
455            throw new AssertionError("Unsupported Method");
456        }
457    
458        public void reconnect() {
459            LOG.debug("Waking up reconnect task");
460            try {
461                reconnectTask.wakeup();
462            } catch (InterruptedException e) {
463                Thread.currentThread().interrupt();
464            }
465        }
466    
467        public TransportListener getTransportListener() {
468            return transportListener;
469        }
470    
471        public void setTransportListener(TransportListener commandListener) {
472            this.transportListener = commandListener;
473        }
474    
475        public <T> T narrow(Class<T> target) {
476    
477            if (target.isAssignableFrom(getClass())) {
478                return target.cast(this);
479            }
480    
481            synchronized (reconnectMutex) {
482                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
483                    FanoutTransportHandler th = iter.next();
484                    if (th.transport != null) {
485                        T rc = th.transport.narrow(target);
486                        if (rc != null) {
487                            return rc;
488                        }
489                    }
490                }
491            }
492    
493            return null;
494    
495        }
496    
497        protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
498            th.transport.start();
499            stateTracker.setRestoreConsumers(th.transport == primary);
500            stateTracker.restore(th.transport);
501            for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
502                RequestCounter rc = iter2.next();
503                th.transport.oneway(rc.command);
504            }
505        }
506    
507        public void add(boolean reblance,URI uris[]) {
508    
509            synchronized (reconnectMutex) {
510                for (int i = 0; i < uris.length; i++) {
511                    URI uri = uris[i];
512    
513                    boolean match = false;
514                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
515                        FanoutTransportHandler th = iter.next();
516                        if (th.uri.equals(uri)) {
517                            match = true;
518                            break;
519                        }
520                    }
521                    if (!match) {
522                        FanoutTransportHandler th = new FanoutTransportHandler(uri);
523                        transports.add(th);
524                        reconnect();
525                    }
526                }
527            }
528    
529        }
530    
531        public void remove(boolean rebalance,URI uris[]) {
532    
533            synchronized (reconnectMutex) {
534                for (int i = 0; i < uris.length; i++) {
535                    URI uri = uris[i];
536    
537                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
538                        FanoutTransportHandler th = iter.next();
539                        if (th.uri.equals(uri)) {
540                            if (th.transport != null) {
541                                ServiceSupport.dispose(th.transport);
542                                connectedCount--;
543                            }
544                            iter.remove();
545                            break;
546                        }
547                    }
548                }
549            }
550    
551        }
552        
553        public void reconnect(URI uri) throws IOException {
554                    add(true,new URI[]{uri});
555                    
556            }
557        
558        public boolean isReconnectSupported() {
559            return true;
560        }
561    
562        public boolean isUpdateURIsSupported() {
563            return true;
564        }
565        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
566            add(reblance,uris);
567        }
568    
569    
570        public String getRemoteAddress() {
571            if (primary != null) {
572                if (primary.transport != null) {
573                    return primary.transport.getRemoteAddress();
574                }
575            }
576            return null;
577        }
578    
579        protected void transportListenerOnCommand(Command command) {
580            if (transportListener != null) {
581                transportListener.onCommand(command);
582            }
583        }
584    
585        public boolean isFaultTolerant() {
586            return true;
587        }
588    
589        public boolean isFanOutQueues() {
590            return fanOutQueues;
591        }
592    
593        public void setFanOutQueues(boolean fanOutQueues) {
594            this.fanOutQueues = fanOutQueues;
595        }
596    
597            public boolean isDisposed() {
598                    return disposed;
599            }
600            
601    
602        public boolean isConnected() {
603            return connected;
604        }
605    
606        public int getReceiveCounter() {
607            int rc = 0;
608            synchronized (reconnectMutex) {
609                for (FanoutTransportHandler th : transports) {
610                    if (th.transport != null) {
611                        rc += th.transport.getReceiveCounter();
612                    }
613                }
614            }
615            return rc;
616        }
617    }