001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.InterruptedIOException;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.Socket;
026    import java.net.SocketException;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.net.UnknownHostException;
030    import java.util.HashMap;
031    import java.util.Map;
032    import java.util.concurrent.CountDownLatch;
033    import java.util.concurrent.SynchronousQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicReference;
038    import javax.net.SocketFactory;
039    import org.apache.activemq.Service;
040    import org.apache.activemq.thread.DefaultThreadPools;
041    import org.apache.activemq.transport.Transport;
042    import org.apache.activemq.transport.TransportLoggerFactory;
043    import org.apache.activemq.transport.TransportThreadSupport;
044    import org.apache.activemq.util.InetAddressUtil;
045    import org.apache.activemq.util.IntrospectionSupport;
046    import org.apache.activemq.util.ServiceStopper;
047    import org.apache.activemq.wireformat.WireFormat;
048    import org.slf4j.Logger;
049    import org.slf4j.LoggerFactory;
050    
051    /**
052     * An implementation of the {@link Transport} interface using raw tcp/ip
053     * 
054     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055     * 
056     */
057    public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058        private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059        protected final URI remoteLocation;
060        protected final URI localLocation;
061        protected final WireFormat wireFormat;
062    
063        protected int connectionTimeout = 30000;
064        protected int soTimeout;
065        protected int socketBufferSize = 64 * 1024;
066        protected int ioBufferSize = 8 * 1024;
067        protected boolean closeAsync=true;
068        protected Socket socket;
069        protected DataOutputStream dataOut;
070        protected DataInputStream dataIn;
071        protected TimeStampStream buffOut = null;
072        /**
073         * The Traffic Class to be set on the socket.
074         */
075        protected int trafficClass = 0;
076        /**
077         * Keeps track of attempts to set the Traffic Class on the socket.
078         */
079        private boolean trafficClassSet = false;
080        /**
081         * Prevents setting both the Differentiated Services and Type of Service
082         * transport options at the same time, since they share the same spot in
083         * the TCP/IP packet headers.
084         */
085        protected boolean diffServChosen = false;
086        protected boolean typeOfServiceChosen = false;
087        /**
088         * trace=true -> the Transport stack where this TcpTransport
089         * object will be, will have a TransportLogger layer
090         * trace=false -> the Transport stack where this TcpTransport
091         * object will be, will NOT have a TransportLogger layer, and therefore
092         * will never be able to print logging messages.
093         * This parameter is most probably set in Connection or TransportConnector URIs.
094         */
095        protected boolean trace = false;
096        /**
097         * Name of the LogWriter implementation to use.
098         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
099         * This parameter is most probably set in Connection or TransportConnector URIs.
100         */
101        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
102        /**
103         * Specifies if the TransportLogger will be manageable by JMX or not.
104         * Also, as long as there is at least 1 TransportLogger which is manageable,
105         * a TransportLoggerControl MBean will me created.
106         */
107        protected boolean dynamicManagement = false;
108        /**
109         * startLogging=true -> the TransportLogger object of the Transport stack
110         * will initially write messages to the log.
111         * startLogging=false -> the TransportLogger object of the Transport stack
112         * will initially NOT write messages to the log.
113         * This parameter only has an effect if trace == true.
114         * This parameter is most probably set in Connection or TransportConnector URIs.
115         */
116        protected boolean startLogging = true;
117        /**
118         * Specifies the port that will be used by the JMX server to manage
119         * the TransportLoggers.
120         * This should only be set in an URI by a client (producer or consumer) since
121         * a broker will already create a JMX server.
122         * It is useful for people who test a broker and clients in the same machine
123         * and want to control both via JMX; a different port will be needed.
124         */
125        protected int jmxPort = 1099;
126        protected boolean useLocalHost = false;
127        protected int minmumWireFormatVersion;
128        protected SocketFactory socketFactory;
129        protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
130    
131        private Map<String, Object> socketOptions;
132        private Boolean keepAlive;
133        private Boolean tcpNoDelay;
134        private Thread runnerThread;
135        private volatile int receiveCounter;
136    
137        /**
138         * Connect to a remote Node - e.g. a Broker
139         * 
140         * @param wireFormat
141         * @param socketFactory
142         * @param remoteLocation
143         * @param localLocation - e.g. local InetAddress and local port
144         * @throws IOException
145         * @throws UnknownHostException
146         */
147        public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
148                            URI localLocation) throws UnknownHostException, IOException {
149            this.wireFormat = wireFormat;
150            this.socketFactory = socketFactory;
151            try {
152                this.socket = socketFactory.createSocket();
153            } catch (SocketException e) {
154                this.socket = null;
155            }
156            this.remoteLocation = remoteLocation;
157            this.localLocation = localLocation;
158            setDaemon(false);
159        }
160    
161        /**
162         * Initialize from a server Socket
163         * 
164         * @param wireFormat
165         * @param socket
166         * @throws IOException
167         */
168        public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
169            this.wireFormat = wireFormat;
170            this.socket = socket;
171            this.remoteLocation = null;
172            this.localLocation = null;
173            setDaemon(true);
174        }
175    
176        /**
177         * A one way asynchronous send
178         */
179        public void oneway(Object command) throws IOException {
180            checkStarted();
181            wireFormat.marshal(command, dataOut);
182            dataOut.flush();
183        }
184    
185        /**
186         * @return pretty print of 'this'
187         */
188        @Override
189        public String toString() {
190            return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
191                    : (localLocation != null ? localLocation : remoteLocation)) ;
192        }
193    
194        /**
195         * reads packets from a Socket
196         */
197        public void run() {
198            LOG.trace("TCP consumer thread for " + this + " starting");
199            this.runnerThread=Thread.currentThread();
200            try {
201                while (!isStopped()) {
202                    doRun();
203                }
204            } catch (IOException e) {
205                stoppedLatch.get().countDown();
206                onException(e);
207            } catch (Throwable e){
208                stoppedLatch.get().countDown();
209                IOException ioe=new IOException("Unexpected error occured");
210                ioe.initCause(e);
211                onException(ioe);
212            }finally {
213                stoppedLatch.get().countDown();
214            }
215        }
216    
217        protected void doRun() throws IOException {
218            try {
219                Object command = readCommand();
220                doConsume(command);
221            } catch (SocketTimeoutException e) {
222            } catch (InterruptedIOException e) {
223            }
224        }
225    
226        protected Object readCommand() throws IOException {
227            return wireFormat.unmarshal(dataIn);
228        }
229    
230        // Properties
231        // -------------------------------------------------------------------------
232        public String getDiffServ() {
233            // This is the value requested by the user by setting the Tcp Transport
234            // options. If the socket hasn't been created, then this value may not
235            // reflect the value returned by Socket.getTrafficClass().
236            return Integer.toString(this.trafficClass);
237        }
238    
239        public void setDiffServ(String diffServ) throws IllegalArgumentException {
240            this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
241            this.diffServChosen = true;
242        }
243    
244        public int getTypeOfService() {
245            // This is the value requested by the user by setting the Tcp Transport
246            // options. If the socket hasn't been created, then this value may not
247            // reflect the value returned by Socket.getTrafficClass().
248            return this.trafficClass;
249        }
250      
251        public void setTypeOfService(int typeOfService) {
252            this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
253            this.typeOfServiceChosen = true;
254        }
255    
256        public boolean isTrace() {
257            return trace;
258        }
259    
260        public void setTrace(boolean trace) {
261            this.trace = trace;
262        }
263        
264        public String getLogWriterName() {
265            return logWriterName;
266        }
267    
268        public void setLogWriterName(String logFormat) {
269            this.logWriterName = logFormat;
270        }
271    
272        public boolean isDynamicManagement() {
273            return dynamicManagement;
274        }
275    
276        public void setDynamicManagement(boolean useJmx) {
277            this.dynamicManagement = useJmx;
278        }
279    
280        public boolean isStartLogging() {
281            return startLogging;
282        }
283    
284        public void setStartLogging(boolean startLogging) {
285            this.startLogging = startLogging;
286        }
287    
288        public int getJmxPort() {
289            return jmxPort;
290        }
291    
292        public void setJmxPort(int jmxPort) {
293            this.jmxPort = jmxPort;
294        }
295        
296        public int getMinmumWireFormatVersion() {
297            return minmumWireFormatVersion;
298        }
299    
300        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
301            this.minmumWireFormatVersion = minmumWireFormatVersion;
302        }
303    
304        public boolean isUseLocalHost() {
305            return useLocalHost;
306        }
307    
308        /**
309         * Sets whether 'localhost' or the actual local host name should be used to
310         * make local connections. On some operating systems such as Macs its not
311         * possible to connect as the local host name so localhost is better.
312         */
313        public void setUseLocalHost(boolean useLocalHost) {
314            this.useLocalHost = useLocalHost;
315        }
316    
317        public int getSocketBufferSize() {
318            return socketBufferSize;
319        }
320    
321        /**
322         * Sets the buffer size to use on the socket
323         */
324        public void setSocketBufferSize(int socketBufferSize) {
325            this.socketBufferSize = socketBufferSize;
326        }
327    
328        public int getSoTimeout() {
329            return soTimeout;
330        }
331    
332        /**
333         * Sets the socket timeout
334         */
335        public void setSoTimeout(int soTimeout) {
336            this.soTimeout = soTimeout;
337        }
338    
339        public int getConnectionTimeout() {
340            return connectionTimeout;
341        }
342    
343        /**
344         * Sets the timeout used to connect to the socket
345         */
346        public void setConnectionTimeout(int connectionTimeout) {
347            this.connectionTimeout = connectionTimeout;
348        }
349    
350        public Boolean getKeepAlive() {
351            return keepAlive;
352        }
353    
354        /**
355         * Enable/disable TCP KEEP_ALIVE mode
356         */
357        public void setKeepAlive(Boolean keepAlive) {
358            this.keepAlive = keepAlive;
359        }
360    
361        public Boolean getTcpNoDelay() {
362            return tcpNoDelay;
363        }
364    
365        /**
366         * Enable/disable the TCP_NODELAY option on the socket
367         */
368        public void setTcpNoDelay(Boolean tcpNoDelay) {
369            this.tcpNoDelay = tcpNoDelay;
370        }
371    
372        /**
373         * @return the ioBufferSize
374         */
375        public int getIoBufferSize() {
376            return this.ioBufferSize;
377        }
378    
379        /**
380         * @param ioBufferSize the ioBufferSize to set
381         */
382        public void setIoBufferSize(int ioBufferSize) {
383            this.ioBufferSize = ioBufferSize;
384        }
385        
386        /**
387         * @return the closeAsync
388         */
389        public boolean isCloseAsync() {
390            return closeAsync;
391        }
392    
393        /**
394         * @param closeAsync the closeAsync to set
395         */
396        public void setCloseAsync(boolean closeAsync) {
397            this.closeAsync = closeAsync;
398        }
399    
400        // Implementation methods
401        // -------------------------------------------------------------------------
402        protected String resolveHostName(String host) throws UnknownHostException {
403            if (isUseLocalHost()) {
404                String localName = InetAddressUtil.getLocalHostName();
405                if (localName != null && localName.equals(host)) {
406                    return "localhost";
407                }
408            }
409            return host;
410        }
411    
412        /**
413         * Configures the socket for use
414         * 
415         * @param sock
416         * @throws SocketException, IllegalArgumentException if setting the options
417         *         on the socket failed.
418         */
419        protected void initialiseSocket(Socket sock) throws SocketException,
420                IllegalArgumentException {
421            if (socketOptions != null) {
422                IntrospectionSupport.setProperties(socket, socketOptions);
423            }
424    
425            try {
426                sock.setReceiveBufferSize(socketBufferSize);
427                sock.setSendBufferSize(socketBufferSize);
428            } catch (SocketException se) {
429                LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
430                LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
431            }
432            sock.setSoTimeout(soTimeout);
433    
434            if (keepAlive != null) {
435                sock.setKeepAlive(keepAlive.booleanValue());
436            }
437            if (tcpNoDelay != null) {
438                sock.setTcpNoDelay(tcpNoDelay.booleanValue());
439            }
440            if (!this.trafficClassSet) {
441                this.trafficClassSet = setTrafficClass(sock);
442            }
443        }
444    
445        @Override
446        protected void doStart() throws Exception {
447            connect();
448            stoppedLatch.set(new CountDownLatch(1));
449            super.doStart();
450        }
451    
452        protected void connect() throws Exception {
453    
454            if (socket == null && socketFactory == null) {
455                throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
456            }
457    
458            InetSocketAddress localAddress = null;
459            InetSocketAddress remoteAddress = null;
460    
461            if (localLocation != null) {
462                localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
463                                                     localLocation.getPort());
464            }
465    
466            if (remoteLocation != null) {
467                String host = resolveHostName(remoteLocation.getHost());
468                remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
469            }
470            // Set the traffic class before the socket is connected when possible so
471            // that the connection packets are given the correct traffic class.
472            this.trafficClassSet = setTrafficClass(socket);
473    
474            if (socket != null) {
475    
476                if (localAddress != null) {
477                    socket.bind(localAddress);
478                }
479    
480                // If it's a server accepted socket.. we don't need to connect it
481                // to a remote address.
482                if (remoteAddress != null) {
483                    if (connectionTimeout >= 0) {
484                        socket.connect(remoteAddress, connectionTimeout);
485                    } else {
486                        socket.connect(remoteAddress);
487                    }
488                }
489    
490            } else {
491                // For SSL sockets.. you can't create an unconnected socket :(
492                // This means the timout option are not supported either.
493                if (localAddress != null) {
494                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
495                                                        localAddress.getAddress(), localAddress.getPort());
496                } else {
497                    socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
498                }
499            }
500    
501            initialiseSocket(socket);
502            initializeStreams();
503        }
504    
505        @Override
506        protected void doStop(ServiceStopper stopper) throws Exception {
507            if (LOG.isDebugEnabled()) {
508                LOG.debug("Stopping transport " + this);
509            }
510    
511            // Closing the streams flush the sockets before closing.. if the socket
512            // is hung.. then this hangs the close.
513            // closeStreams();
514            if (socket != null) {
515                if (closeAsync) {
516                    //closing the socket can hang also 
517                    final CountDownLatch latch = new CountDownLatch(1);
518                    
519                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
520        
521                        public void run() {
522                            try {
523                                socket.close();
524                            } catch (IOException e) {
525                                LOG.debug("Caught exception closing socket",e);
526                            }finally {
527                                latch.countDown();
528                            }
529                        }
530                        
531                    });
532                    latch.await(1,TimeUnit.SECONDS);
533                }else {
534                    try {
535                        socket.close();
536                    } catch (IOException e) {
537                        LOG.debug("Caught exception closing socket",e);
538                    }
539                }
540               
541            }
542        }
543    
544        /**
545         * Override so that stop() blocks until the run thread is no longer running.
546         */
547        @Override
548        public void stop() throws Exception {
549            super.stop();
550            CountDownLatch countDownLatch = stoppedLatch.get();
551            if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
552                countDownLatch.await(1,TimeUnit.SECONDS);
553            }
554        }
555    
556        protected void initializeStreams() throws Exception {
557            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
558                @Override
559                public int read() throws IOException {
560                    receiveCounter++;
561                    return super.read();
562                }
563                @Override
564                public int read(byte[] b, int off, int len) throws IOException {
565                    receiveCounter++;
566                    return super.read(b, off, len);
567                }
568                @Override
569                public long skip(long n) throws IOException {
570                    receiveCounter++;
571                    return super.skip(n);
572                }
573                @Override
574                protected void fill() throws IOException {
575                    receiveCounter++;
576                    super.fill();
577                }
578            };
579            this.dataIn = new DataInputStream(buffIn);
580            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
581            this.dataOut = new DataOutputStream(outputStream);
582            this.buffOut = outputStream;
583        }
584    
585        protected void closeStreams() throws IOException {
586            if (dataOut != null) {
587                dataOut.close();
588            }
589            if (dataIn != null) {
590                dataIn.close();
591            }
592        }
593    
594        public void setSocketOptions(Map<String, Object> socketOptions) {
595            this.socketOptions = new HashMap<String, Object>(socketOptions);
596        }
597    
598        public String getRemoteAddress() {
599            if (socket != null) {
600                return "" + socket.getRemoteSocketAddress();
601            }
602            return null;
603        }
604        
605        @Override
606        public <T> T narrow(Class<T> target) {
607            if (target == Socket.class) {
608                return target.cast(socket);
609            } else if ( target == TimeStampStream.class) {
610                return target.cast(buffOut);
611            }
612            return super.narrow(target);
613        }
614        
615        public int getReceiveCounter() {
616            return receiveCounter;
617        }
618        
619    
620        /**
621         * @param sock The socket on which to set the Traffic Class.
622         * @return Whether or not the Traffic Class was set on the given socket.
623         * @throws SocketException if the system does not support setting the
624         *         Traffic Class.
625         * @throws IllegalArgumentException if both the Differentiated Services and
626         *         Type of Services transport options have been set on the same
627         *         connection.
628         */
629        private boolean setTrafficClass(Socket sock) throws SocketException,
630                IllegalArgumentException {
631            if (sock == null
632                || (!this.diffServChosen && !this.typeOfServiceChosen)) {
633                return false;
634            }
635            if (this.diffServChosen && this.typeOfServiceChosen) {
636                throw new IllegalArgumentException("Cannot set both the "
637                    + " Differentiated Services and Type of Services transport "
638                    + " options on the same connection.");
639            }
640    
641            sock.setTrafficClass(this.trafficClass);
642    
643            int resultTrafficClass = sock.getTrafficClass();
644            if (this.trafficClass != resultTrafficClass) {
645                // In the case where the user has specified the ECN bits (e.g. in
646                // Type of Service) but the system won't allow the ECN bits to be
647                // set or in the case where setting the traffic class failed for
648                // other reasons, emit a warning.
649                if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
650                        && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
651                    LOG.warn("Attempted to set the Traffic Class to "
652                        + this.trafficClass + " but the result Traffic Class was "
653                        + resultTrafficClass + ". Please check that your system "
654                        + "allows you to set the ECN bits (the first two bits).");
655                } else {
656                    LOG.warn("Attempted to set the Traffic Class to "
657                        + this.trafficClass + " but the result Traffic Class was "
658                        + resultTrafficClass + ". Please check that your system "
659                             + "supports java.net.setTrafficClass.");
660                }
661                return false;
662            }
663            // Reset the guards that prevent both the Differentiated Services
664            // option and the Type of Service option from being set on the same
665            // connection.
666            this.diffServChosen = false;
667            this.typeOfServiceChosen = false;
668            return true;
669        }
670    }