001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.failover;
019    
020    import java.io.BufferedReader;
021    import java.io.FileNotFoundException;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.io.InterruptedIOException;
026    import java.net.InetAddress;
027    import java.net.MalformedURLException;
028    import java.net.URI;
029    import java.net.URL;
030    import java.util.ArrayList;
031    import java.util.HashSet;
032    import java.util.Iterator;
033    import java.util.LinkedHashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Set;
037    import java.util.StringTokenizer;
038    import java.util.concurrent.CopyOnWriteArrayList;
039    import java.util.concurrent.atomic.AtomicReference;
040    import org.apache.activemq.broker.SslContext;
041    import org.apache.activemq.command.Command;
042    import org.apache.activemq.command.ConnectionControl;
043    import org.apache.activemq.command.ConnectionId;
044    import org.apache.activemq.command.RemoveInfo;
045    import org.apache.activemq.command.Response;
046    import org.apache.activemq.state.ConnectionStateTracker;
047    import org.apache.activemq.state.Tracked;
048    import org.apache.activemq.thread.DefaultThreadPools;
049    import org.apache.activemq.thread.Task;
050    import org.apache.activemq.thread.TaskRunner;
051    import org.apache.activemq.transport.CompositeTransport;
052    import org.apache.activemq.transport.DefaultTransportListener;
053    import org.apache.activemq.transport.FutureResponse;
054    import org.apache.activemq.transport.ResponseCallback;
055    import org.apache.activemq.transport.Transport;
056    import org.apache.activemq.transport.TransportFactory;
057    import org.apache.activemq.transport.TransportListener;
058    import org.apache.activemq.util.IOExceptionSupport;
059    import org.apache.activemq.util.ServiceSupport;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    
064    /**
065     * A Transport that is made reliable by being able to fail over to another
066     * transport when a transport failure is detected.
067     * 
068     * 
069     */
070    public class FailoverTransport implements CompositeTransport {
071    
072        private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
073        private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
074        private TransportListener transportListener;
075        private boolean disposed;
076        private boolean connected;
077        private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
078        private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
079    
080        private final Object reconnectMutex = new Object();
081        private final Object backupMutex = new Object();
082        private final Object sleepMutex = new Object();
083        private final Object listenerMutex = new Object();
084        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
085        private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
086    
087        private URI connectedTransportURI;
088        private URI failedConnectTransportURI;
089        private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
090        private final TaskRunner reconnectTask;
091        private boolean started;
092        private boolean initialized;
093        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094        private long maxReconnectDelay = 1000 * 30;
095        private double backOffMultiplier = 2d;
096        private long timeout = -1;
097        private boolean useExponentialBackOff = true;
098        private boolean randomize = true;
099        private int maxReconnectAttempts;
100        private int startupMaxReconnectAttempts;
101        private int connectFailures;
102        private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
103        private Exception connectionFailure;
104        private boolean firstConnection = true;
105        // optionally always have a backup created
106        private boolean backup = false;
107        private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
108        private int backupPoolSize = 1;
109        private boolean trackMessages = false;
110        private boolean trackTransactionProducers = true;
111        private int maxCacheSize = 128 * 1024;
112        private final TransportListener disposedListener = new DefaultTransportListener() {
113        };
114        //private boolean connectionInterruptProcessingComplete;
115    
116        private final TransportListener myTransportListener = createTransportListener();
117        private boolean updateURIsSupported=true;
118        private boolean reconnectSupported=true;
119        // remember for reconnect thread
120        private SslContext brokerSslContext;
121        private String updateURIsURL = null;
122        private boolean rebalanceUpdateURIs=true;
123        private boolean doRebalance = false;
124    
125        public FailoverTransport() throws InterruptedIOException {
126            brokerSslContext = SslContext.getCurrentSslContext();
127            stateTracker.setTrackTransactions(true);
128            // Setup a task that is used to reconnect the a connection async.
129            reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130                public boolean iterate() {
131                    boolean result = false;
132                    boolean buildBackup = true;
133                    boolean doReconnect = !disposed;
134                    synchronized (backupMutex) {
135                        if ((connectedTransport.get() == null || doRebalance) && !disposed) {
136                            result = doReconnect();
137                            buildBackup = false;
138                        }
139                    }
140                    if (buildBackup) {
141                        buildBackups();
142                    } else {
143                        // build backups on the next iteration
144                        buildBackup = true;
145                        try {
146                            reconnectTask.wakeup();
147                        } catch (InterruptedException e) {
148                            LOG.debug("Reconnect task has been interrupted.", e);
149                        }
150                    }
151                    return result;
152                }
153    
154            }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
155        }
156    
157        TransportListener createTransportListener() {
158            return new TransportListener() {
159                public void onCommand(Object o) {
160                    Command command = (Command) o;
161                    if (command == null) {
162                        return;
163                    }
164                    if (command.isResponse()) {
165                        Object object = null;
166                        synchronized (requestMap) {
167                            object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
168                        }
169                        if (object != null && object.getClass() == Tracked.class) {
170                            ((Tracked) object).onResponses(command);
171                        }
172                    }
173                    if (!initialized) {      
174                        initialized = true;
175                    }
176                    
177                    if(command.isConnectionControl()) {
178                        handleConnectionControl((ConnectionControl) command);
179                    }
180                    if (transportListener != null) {
181                        transportListener.onCommand(command);
182                    }
183                }
184    
185                public void onException(IOException error) {
186                    try {
187                        handleTransportFailure(error);
188                    } catch (InterruptedException e) {
189                        Thread.currentThread().interrupt();
190                        transportListener.onException(new InterruptedIOException());
191                    }
192                }
193    
194                public void transportInterupted() {
195                    if (transportListener != null) {
196                        transportListener.transportInterupted();
197                    }
198                }
199    
200                public void transportResumed() {
201                    if (transportListener != null) {
202                        transportListener.transportResumed();
203                    }
204                }
205            };
206        }
207    
208        public final void disposeTransport(Transport transport) {
209            transport.setTransportListener(disposedListener);
210            ServiceSupport.dispose(transport);
211        }
212    
213        public final void handleTransportFailure(IOException e) throws InterruptedException {
214            if (LOG.isTraceEnabled()) {
215                LOG.trace(this + " handleTransportFailure: " + e);
216            }
217            Transport transport = connectedTransport.getAndSet(null);
218            if (transport == null) {
219                // sync with possible in progress reconnect
220                synchronized (reconnectMutex) {
221                    transport = connectedTransport.getAndSet(null);
222                }
223            }
224            if (transport != null) {
225    
226                disposeTransport(transport);
227    
228                boolean reconnectOk = false;
229                synchronized (reconnectMutex) {
230                    if (started) {
231                        LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
232                                + " , attempting to automatically reconnect due to: " + e);
233                        LOG.debug("Transport failed with the following exception:", e);
234                        reconnectOk = true;
235                    }
236                    initialized = false;
237                    failedConnectTransportURI = connectedTransportURI;
238                    connectedTransportURI = null;
239                    connected = false;
240    
241                    // notify before any reconnect attempt so ack state can be
242                    // whacked
243                    if (transportListener != null) {
244                        transportListener.transportInterupted();
245                    }
246    
247                    if (reconnectOk) {
248                        reconnectTask.wakeup();
249                    }
250                }
251            }
252        }
253    
254        public final void handleConnectionControl(ConnectionControl control) {
255            String reconnectStr = control.getReconnectTo();
256            if (reconnectStr != null) {
257                reconnectStr = reconnectStr.trim();
258                if (reconnectStr.length() > 0) {
259                    try {
260                        URI uri = new URI(reconnectStr);
261                        if (isReconnectSupported()) {
262                            reconnect(uri);
263                            LOG.info("Reconnected to: " + uri);
264                        }
265                    } catch (Exception e) {
266                        LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
267                    }
268                }
269            }
270            processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
271        }
272    
273        private final void processNewTransports(boolean rebalance, String newTransports) {
274            if (newTransports != null) {
275                newTransports = newTransports.trim();
276                if (newTransports.length() > 0 && isUpdateURIsSupported()) {
277                    List<URI> list = new ArrayList<URI>();
278                    StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
279                    while (tokenizer.hasMoreTokens()) {
280                        String str = tokenizer.nextToken();
281                        try {
282                            URI uri = new URI(str);
283                            list.add(uri);
284                        } catch (Exception e) {
285                            LOG.error("Failed to parse broker address: " + str, e);
286                        }
287                    }
288                    if (list.isEmpty() == false) {
289                        try {
290                            updateURIs(rebalance, list.toArray(new URI[list.size()]));
291                        } catch (IOException e) {
292                            LOG.error("Failed to update transport URI's from: " + newTransports, e);
293                        }
294                    }
295    
296                }
297            }
298        }
299    
300        public void start() throws Exception {
301            synchronized (reconnectMutex) {
302                LOG.debug("Started.");
303                if (started) {
304                    return;
305                }
306                started = true;
307                stateTracker.setMaxCacheSize(getMaxCacheSize());
308                stateTracker.setTrackMessages(isTrackMessages());
309                stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
310                if (connectedTransport.get() != null) {
311                    stateTracker.restore(connectedTransport.get());
312                } else {
313                    reconnect(false);
314                }
315            }
316        }
317    
318        public void stop() throws Exception {
319            Transport transportToStop = null;
320            synchronized (reconnectMutex) {
321                LOG.debug("Stopped.");
322                if (!started) {
323                    return;
324                }
325                started = false;
326                disposed = true;
327                connected = false;
328                for (BackupTransport t : backups) {
329                    t.setDisposed(true);
330                }
331                backups.clear();
332    
333                if (connectedTransport.get() != null) {
334                    transportToStop = connectedTransport.getAndSet(null);
335                }
336                reconnectMutex.notifyAll();
337            }
338            synchronized (sleepMutex) {
339                sleepMutex.notifyAll();
340            }
341            reconnectTask.shutdown();
342            if (transportToStop != null) {
343                transportToStop.stop();
344            }
345        }
346    
347        public long getInitialReconnectDelay() {
348            return initialReconnectDelay;
349        }
350    
351        public void setInitialReconnectDelay(long initialReconnectDelay) {
352            this.initialReconnectDelay = initialReconnectDelay;
353        }
354    
355        public long getMaxReconnectDelay() {
356            return maxReconnectDelay;
357        }
358    
359        public void setMaxReconnectDelay(long maxReconnectDelay) {
360            this.maxReconnectDelay = maxReconnectDelay;
361        }
362    
363        public long getReconnectDelay() {
364            return reconnectDelay;
365        }
366    
367        public void setReconnectDelay(long reconnectDelay) {
368            this.reconnectDelay = reconnectDelay;
369        }
370    
371        public double getReconnectDelayExponent() {
372            return backOffMultiplier;
373        }
374    
375        public void setReconnectDelayExponent(double reconnectDelayExponent) {
376            this.backOffMultiplier = reconnectDelayExponent;
377        }
378    
379        public Transport getConnectedTransport() {
380            return connectedTransport.get();
381        }
382    
383        public URI getConnectedTransportURI() {
384            return connectedTransportURI;
385        }
386    
387        public int getMaxReconnectAttempts() {
388            return maxReconnectAttempts;
389        }
390    
391        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
392            this.maxReconnectAttempts = maxReconnectAttempts;
393        }
394    
395        public int getStartupMaxReconnectAttempts() {
396            return this.startupMaxReconnectAttempts;
397        }
398    
399        public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
400            this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
401        }
402    
403        public long getTimeout() {
404            return timeout;
405        }
406    
407        public void setTimeout(long timeout) {
408            this.timeout = timeout;
409        }
410    
411        /**
412         * @return Returns the randomize.
413         */
414        public boolean isRandomize() {
415            return randomize;
416        }
417    
418        /**
419         * @param randomize
420         *            The randomize to set.
421         */
422        public void setRandomize(boolean randomize) {
423            this.randomize = randomize;
424        }
425    
426        public boolean isBackup() {
427            return backup;
428        }
429    
430        public void setBackup(boolean backup) {
431            this.backup = backup;
432        }
433    
434        public int getBackupPoolSize() {
435            return backupPoolSize;
436        }
437    
438        public void setBackupPoolSize(int backupPoolSize) {
439            this.backupPoolSize = backupPoolSize;
440        }
441    
442        public boolean isTrackMessages() {
443            return trackMessages;
444        }
445    
446        public void setTrackMessages(boolean trackMessages) {
447            this.trackMessages = trackMessages;
448        }
449    
450        public boolean isTrackTransactionProducers() {
451            return this.trackTransactionProducers;
452        }
453    
454        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
455            this.trackTransactionProducers = trackTransactionProducers;
456        }
457    
458        public int getMaxCacheSize() {
459            return maxCacheSize;
460        }
461    
462        public void setMaxCacheSize(int maxCacheSize) {
463            this.maxCacheSize = maxCacheSize;
464        }
465    
466        /**
467         * @return Returns true if the command is one sent when a connection is
468         *         being closed.
469         */
470        private boolean isShutdownCommand(Command command) {
471            return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
472        }
473    
474        public void oneway(Object o) throws IOException {
475    
476            Command command = (Command) o;
477            Exception error = null;
478            try {
479    
480                synchronized (reconnectMutex) {
481    
482                    if (isShutdownCommand(command) && connectedTransport.get() == null) {
483                        if (command.isShutdownInfo()) {
484                            // Skipping send of ShutdownInfo command when not
485                            // connected.
486                            return;
487                        }
488                        if (command instanceof RemoveInfo || command.isMessageAck()) {
489                            // Simulate response to RemoveInfo command or ack (as it
490                            // will be stale)
491                            stateTracker.track(command);
492                            Response response = new Response();
493                            response.setCorrelationId(command.getCommandId());
494                            myTransportListener.onCommand(response);
495                            return;
496                        }
497                    }
498                    // Keep trying until the message is sent.
499                    for (int i = 0; !disposed; i++) {
500                        try {
501    
502                            // Wait for transport to be connected.
503                            Transport transport = connectedTransport.get();
504                            long start = System.currentTimeMillis();
505                            boolean timedout = false;
506                            while (transport == null && !disposed && connectionFailure == null
507                                    && !Thread.currentThread().isInterrupted()) {
508                                LOG.trace("Waiting for transport to reconnect..: " + command);
509                                long end = System.currentTimeMillis();
510                                if (timeout > 0 && (end - start > timeout)) {
511                                    timedout = true;
512                                    LOG.info("Failover timed out after " + (end - start) + "ms");
513                                    break;
514                                }
515                                try {
516                                    reconnectMutex.wait(100);
517                                } catch (InterruptedException e) {
518                                    Thread.currentThread().interrupt();
519                                    LOG.debug("Interupted: " + e, e);
520                                }
521                                transport = connectedTransport.get();
522                            }
523    
524                            if (transport == null) {
525                                // Previous loop may have exited due to use being
526                                // disposed.
527                                if (disposed) {
528                                    error = new IOException("Transport disposed.");
529                                } else if (connectionFailure != null) {
530                                    error = connectionFailure;
531                                } else if (timedout == true) {
532                                    error = new IOException("Failover timeout of " + timeout + " ms reached.");
533                                } else {
534                                    error = new IOException("Unexpected failure.");
535                                }
536                                break;
537                            }
538    
539                            // If it was a request and it was not being tracked by
540                            // the state tracker,
541                            // then hold it in the requestMap so that we can replay
542                            // it later.
543                            Tracked tracked = stateTracker.track(command);
544                            synchronized (requestMap) {
545                                if (tracked != null && tracked.isWaitingForResponse()) {
546                                    requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
547                                } else if (tracked == null && command.isResponseRequired()) {
548                                    requestMap.put(Integer.valueOf(command.getCommandId()), command);
549                                }
550                            }
551    
552                            // Send the message.
553                            try {
554                                transport.oneway(command);
555                                stateTracker.trackBack(command);
556                            } catch (IOException e) {
557    
558                                // If the command was not tracked.. we will retry in
559                                // this method
560                                if (tracked == null) {
561    
562                                    // since we will retry in this method.. take it
563                                    // out of the request
564                                    // map so that it is not sent 2 times on
565                                    // recovery
566                                    if (command.isResponseRequired()) {
567                                        requestMap.remove(Integer.valueOf(command.getCommandId()));
568                                    }
569    
570                                    // Rethrow the exception so it will handled by
571                                    // the outer catch
572                                    throw e;
573                                }
574    
575                            }
576    
577                            return;
578    
579                        } catch (IOException e) {
580                            if (LOG.isDebugEnabled()) {
581                                LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
582                            }
583                            handleTransportFailure(e);
584                        }
585                    }
586                }
587            } catch (InterruptedException e) {
588                // Some one may be trying to stop our thread.
589                Thread.currentThread().interrupt();
590                throw new InterruptedIOException();
591            }
592            if (!disposed) {
593                if (error != null) {
594                    if (error instanceof IOException) {
595                        throw (IOException) error;
596                    }
597                    throw IOExceptionSupport.create(error);
598                }
599            }
600        }
601    
602        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
603            throw new AssertionError("Unsupported Method");
604        }
605    
606        public Object request(Object command) throws IOException {
607            throw new AssertionError("Unsupported Method");
608        }
609    
610        public Object request(Object command, int timeout) throws IOException {
611            throw new AssertionError("Unsupported Method");
612        }
613    
614        public void add(boolean rebalance, URI u[]) {
615            boolean newURI = false;
616            for (int i = 0; i < u.length; i++) {
617                if (!contains(u[i])) {
618                    uris.add(u[i]);
619                    newURI = true;
620                }
621            }
622            if (newURI) {
623                reconnect(rebalance);
624            }
625        }
626    
627        public void remove(boolean rebalance, URI u[]) {
628            for (int i = 0; i < u.length; i++) {
629                uris.remove(u[i]);
630            }
631            // rebalance is automatic if any connected to removed/stopped broker
632        }
633    
634        public void add(boolean rebalance, String u) {
635            try {
636                URI newURI = new URI(u);
637                if (contains(newURI)==false) {
638                    uris.add(newURI);
639                    reconnect(rebalance);
640                }
641           
642            } catch (Exception e) {
643                LOG.error("Failed to parse URI: " + u);
644            }
645        }
646    
647        public void reconnect(boolean rebalance) {
648            synchronized (reconnectMutex) {
649                if (started) {
650                    if (rebalance) {
651                        doRebalance = true;
652                    }
653                    LOG.debug("Waking up reconnect task");
654                    try {
655                        reconnectTask.wakeup();
656                    } catch (InterruptedException e) {
657                        Thread.currentThread().interrupt();
658                    }
659                } else {
660                    LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
661                }
662            }
663        }
664    
665        private List<URI> getConnectList() {
666            ArrayList<URI> l = new ArrayList<URI>(uris);
667            boolean removed = false;
668            if (failedConnectTransportURI != null) {
669                removed = l.remove(failedConnectTransportURI);
670            }
671            if (randomize) {
672                // Randomly, reorder the list by random swapping
673                for (int i = 0; i < l.size(); i++) {
674                    int p = (int) (Math.random() * 100 % l.size());
675                    URI t = l.get(p);
676                    l.set(p, l.get(i));
677                    l.set(i, t);
678                }
679            }
680            if (removed) {
681                l.add(failedConnectTransportURI);
682            }
683            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
684            return l;
685        }
686    
687        public TransportListener getTransportListener() {
688            return transportListener;
689        }
690    
691        public void setTransportListener(TransportListener commandListener) {
692            synchronized (listenerMutex) {
693                this.transportListener = commandListener;
694                listenerMutex.notifyAll();
695            }
696        }
697    
698        public <T> T narrow(Class<T> target) {
699    
700            if (target.isAssignableFrom(getClass())) {
701                return target.cast(this);
702            }
703            Transport transport = connectedTransport.get();
704            if (transport != null) {
705                return transport.narrow(target);
706            }
707            return null;
708    
709        }
710    
711        protected void restoreTransport(Transport t) throws Exception, IOException {
712            t.start();
713            // send information to the broker - informing it we are an ft client
714            ConnectionControl cc = new ConnectionControl();
715            cc.setFaultTolerant(true);
716            t.oneway(cc);
717            stateTracker.restore(t);
718            Map tmpMap = null;
719            synchronized (requestMap) {
720                tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
721            }
722            for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
723                Command command = iter2.next();
724                if (LOG.isTraceEnabled()) {
725                    LOG.trace("restore requestMap, replay: " + command);
726                }
727                t.oneway(command);
728            }
729        }
730    
731        public boolean isUseExponentialBackOff() {
732            return useExponentialBackOff;
733        }
734    
735        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
736            this.useExponentialBackOff = useExponentialBackOff;
737        }
738    
739        @Override
740        public String toString() {
741            return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
742        }
743    
744        public String getRemoteAddress() {
745            Transport transport = connectedTransport.get();
746            if (transport != null) {
747                return transport.getRemoteAddress();
748            }
749            return null;
750        }
751    
752        public boolean isFaultTolerant() {
753            return true;
754        }
755    
756        final boolean doReconnect() {
757            Exception failure = null;
758            synchronized (reconnectMutex) {
759    
760                // If updateURIsURL is specified, read the file and add any new
761                // transport URI's to this FailOverTransport. 
762                // Note: Could track file timestamp to avoid unnecessary reading.
763                String fileURL = getUpdateURIsURL();
764                if (fileURL != null) {
765                    BufferedReader in = null;
766                    String newUris = null;
767                    StringBuffer buffer = new StringBuffer();
768    
769                    try {
770                        in = new BufferedReader(getURLStream(fileURL));
771                        while (true) {
772                            String line = in.readLine();
773                            if (line == null) {
774                                break;
775                            }
776                            buffer.append(line);
777                        }
778                        newUris = buffer.toString();
779                    } catch (IOException ioe) {
780                        LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
781                    } finally {
782                        if (in != null) {
783                            try {
784                                in.close();
785                            } catch (IOException ioe) {
786                                // ignore
787                            }
788                        }
789                    }
790                    
791                    processNewTransports(isRebalanceUpdateURIs(), newUris);
792                }
793    
794                if (disposed || connectionFailure != null) {
795                    reconnectMutex.notifyAll();
796                }
797    
798                if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
799                    return false;
800                } else {
801                    List<URI> connectList = getConnectList();
802                    if (connectList.isEmpty()) {
803                        failure = new IOException("No uris available to connect to.");
804                    } else {
805                        if (doRebalance) {
806                            if (connectList.get(0).equals(connectedTransportURI)) {
807                                // already connected to first in the list, no need to rebalance
808                                doRebalance = false;
809                                return false;
810                            } else {
811                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
812                                try {
813                                    Transport transport = this.connectedTransport.getAndSet(null);
814                                    if (transport != null) {
815                                        disposeTransport(transport);
816                                    }
817                                } catch (Exception e) {
818                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
819                                }
820                            }
821                            doRebalance = false;
822                        }
823                        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
824                            reconnectDelay = initialReconnectDelay;
825                        }
826                        synchronized (backupMutex) {
827                            if (backup && !backups.isEmpty()) {
828                                BackupTransport bt = backups.remove(0);
829                                Transport t = bt.getTransport();
830                                URI uri = bt.getUri();
831                                t.setTransportListener(myTransportListener);
832                                try {
833                                    if (started) {
834                                        restoreTransport(t);
835                                    }
836                                    reconnectDelay = initialReconnectDelay;
837                                    failedConnectTransportURI = null;
838                                    connectedTransportURI = uri;
839                                    connectedTransport.set(t);
840                                    reconnectMutex.notifyAll();
841                                    connectFailures = 0;
842                                    LOG.info("Successfully reconnected to backup " + uri);
843                                    return false;
844                                } catch (Exception e) {
845                                    LOG.debug("Backup transport failed", e);
846                                }
847                            }
848                        }
849    
850                        Iterator<URI> iter = connectList.iterator();
851                        while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
852                            URI uri = iter.next();
853                            Transport t = null;
854                            try {
855                                LOG.debug("Attempting connect to: " + uri);
856                                SslContext.setCurrentSslContext(brokerSslContext);
857                                t = TransportFactory.compositeConnect(uri);
858                                t.setTransportListener(myTransportListener);
859                                t.start();
860    
861                                if (started) {
862                                    restoreTransport(t);
863                                }
864    
865                                LOG.debug("Connection established");
866                                reconnectDelay = initialReconnectDelay;
867                                connectedTransportURI = uri;
868                                connectedTransport.set(t);
869                                reconnectMutex.notifyAll();
870                                connectFailures = 0;
871                                // Make sure on initial startup, that the
872                                // transportListener
873                                // has been initialized for this instance.
874                                synchronized (listenerMutex) {
875                                    if (transportListener == null) {
876                                        try {
877                                            // if it isn't set after 2secs - it
878                                            // probably never will be
879                                            listenerMutex.wait(2000);
880                                        } catch (InterruptedException ex) {
881                                        }
882                                    }
883                                }
884                                if (transportListener != null) {
885                                    transportListener.transportResumed();
886                                } else {
887                                    LOG.debug("transport resumed by transport listener not set");
888                                }
889                                if (firstConnection) {
890                                    firstConnection = false;
891                                    LOG.info("Successfully connected to " + uri);
892                                } else {
893                                    LOG.info("Successfully reconnected to " + uri);
894                                }
895                                connected = true;
896                                return false;
897                            } catch (Exception e) {
898                                failure = e;
899                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
900                                if (t != null) {
901                                    try {
902                                        t.stop();
903                                    } catch (Exception ee) {
904                                        LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
905                                    }
906                                }
907                            } finally {
908                                SslContext.setCurrentSslContext(null);
909                            }
910                        }
911                    }
912                }
913                int reconnectAttempts = 0;
914                if (firstConnection) {
915                    if (this.startupMaxReconnectAttempts != 0) {
916                        reconnectAttempts = this.startupMaxReconnectAttempts;
917                    }
918                }
919                if (reconnectAttempts == 0) {
920                    reconnectAttempts = this.maxReconnectAttempts;
921                }
922                if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
923                    LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
924                    connectionFailure = failure;
925    
926                    // Make sure on initial startup, that the transportListener has
927                    // been initialized
928                    // for this instance.
929                    synchronized (listenerMutex) {
930                        if (transportListener == null) {
931                            try {
932                                listenerMutex.wait(2000);
933                            } catch (InterruptedException ex) {
934                            }
935                        }
936                    }
937    
938                    if (transportListener != null) {
939                        if (connectionFailure instanceof IOException) {
940                            transportListener.onException((IOException) connectionFailure);
941                        } else {
942                            transportListener.onException(IOExceptionSupport.create(connectionFailure));
943                        }
944                    }
945                    reconnectMutex.notifyAll();
946                    return false;
947                }
948            }
949            if (!disposed) {
950    
951                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
952                synchronized (sleepMutex) {
953                    try {
954                        sleepMutex.wait(reconnectDelay);
955                    } catch (InterruptedException e) {
956                        Thread.currentThread().interrupt();
957                    }
958                }
959    
960                if (useExponentialBackOff) {
961                    // Exponential increment of reconnect delay.
962                    reconnectDelay *= backOffMultiplier;
963                    if (reconnectDelay > maxReconnectDelay) {
964                        reconnectDelay = maxReconnectDelay;
965                    }
966                }
967            }
968            return !disposed;
969        }
970    
971        final boolean buildBackups() {
972            synchronized (backupMutex) {
973                if (!disposed && backup && backups.size() < backupPoolSize) {
974                    List<URI> connectList = getConnectList();
975                    // removed disposed backups
976                    List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
977                    for (BackupTransport bt : backups) {
978                        if (bt.isDisposed()) {
979                            disposedList.add(bt);
980                        }
981                    }
982                    backups.removeAll(disposedList);
983                    disposedList.clear();
984                    for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
985                        URI uri = iter.next();
986                        if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
987                            try {
988                                SslContext.setCurrentSslContext(brokerSslContext);
989                                BackupTransport bt = new BackupTransport(this);
990                                bt.setUri(uri);
991                                if (!backups.contains(bt)) {
992                                    Transport t = TransportFactory.compositeConnect(uri);
993                                    t.setTransportListener(bt);
994                                    t.start();
995                                    bt.setTransport(t);
996                                    backups.add(bt);
997                                }
998                            } catch (Exception e) {
999                                LOG.debug("Failed to build backup ", e);
1000                            } finally {
1001                                SslContext.setCurrentSslContext(null);
1002                            }
1003                        }
1004                    }
1005                }
1006            }
1007            return false;
1008        }
1009    
1010        public boolean isDisposed() {
1011            return disposed;
1012        }
1013    
1014        public boolean isConnected() {
1015            return connected;
1016        }
1017    
1018        public void reconnect(URI uri) throws IOException {
1019            add(true, new URI[] { uri });
1020        }
1021    
1022        public boolean isReconnectSupported() {
1023            return this.reconnectSupported;
1024        }
1025        
1026        public void setReconnectSupported(boolean value) {
1027            this.reconnectSupported=value;
1028        }
1029       
1030        public boolean isUpdateURIsSupported() {
1031            return this.updateURIsSupported;
1032        }
1033        
1034        public void setUpdateURIsSupported(boolean value) {
1035            this.updateURIsSupported=value;
1036        }
1037    
1038        public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1039            if (isUpdateURIsSupported()) {
1040                List<URI> copy = new ArrayList<URI>(this.updated);
1041                List<URI> add = new ArrayList<URI>();
1042                if (updatedURIs != null && updatedURIs.length > 0) {
1043                    Set<URI> set = new HashSet<URI>();
1044                    for (int i = 0; i < updatedURIs.length; i++) {
1045                        URI uri = updatedURIs[i];
1046                        if (uri != null) {
1047                            set.add(uri);
1048                        }
1049                    }
1050                    for (URI uri : set) {
1051                        if (copy.remove(uri) == false) {
1052                            add.add(uri);
1053                        }
1054                    }
1055                    synchronized (reconnectMutex) {
1056                        this.updated.clear();
1057                        this.updated.addAll(add);
1058                        for (URI uri : copy) {
1059                            this.uris.remove(uri);
1060                        }
1061                        add(rebalance, add.toArray(new URI[add.size()]));
1062                    }
1063                }
1064            }
1065        }
1066        
1067        /**
1068         * @return the updateURIsURL
1069         */
1070        public String getUpdateURIsURL() {
1071            return this.updateURIsURL;
1072        }
1073    
1074        /**
1075         * @param updateURIsURL the updateURIsURL to set
1076         */
1077        public void setUpdateURIsURL(String updateURIsURL) {
1078            this.updateURIsURL = updateURIsURL;
1079        }
1080        
1081        /**
1082         * @return the rebalanceUpdateURIs
1083         */
1084        public boolean isRebalanceUpdateURIs() {
1085            return this.rebalanceUpdateURIs;
1086        }
1087    
1088        /**
1089         * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1090         */
1091        public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1092            this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1093        }
1094    
1095        public int getReceiveCounter() {
1096            Transport transport = connectedTransport.get();
1097            if (transport == null) {
1098                return 0;
1099            }
1100            return transport.getReceiveCounter();
1101        }
1102    
1103        public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1104            synchronized (reconnectMutex) {
1105                stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1106            }
1107        }
1108        
1109        public ConnectionStateTracker getStateTracker() {
1110            return stateTracker;
1111        }
1112        
1113        private boolean contains(URI newURI) {
1114    
1115            boolean result = false;
1116            try {
1117            for (URI uri:uris) {
1118                if (newURI.getPort()==uri.getPort()) {
1119                    InetAddress newAddr = InetAddress.getByName(newURI.getHost());
1120                    InetAddress addr = InetAddress.getByName(uri.getHost());
1121                    if (addr.equals(newAddr)) {
1122                        result = true;
1123                        break;
1124                    }
1125                }
1126            }
1127            }catch(IOException e) {
1128                result = true;
1129                LOG.error("Failed to verify URI " + newURI + " already known: " + e);
1130            }
1131            return result;
1132        }
1133        
1134        private InputStreamReader getURLStream(String path) throws IOException {
1135            InputStreamReader result = null;
1136            URL url = null;
1137            try {
1138                url = new URL(path);
1139                result = new InputStreamReader(url.openStream());
1140            } catch (MalformedURLException e) {
1141                // ignore - it could be a path to a a local file
1142            }
1143            if (result == null) {
1144                result = new FileReader(path);
1145            }
1146            return result;
1147        }
1148    }