001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.tcp; 018 019 import java.io.DataInputStream; 020 import java.io.DataOutputStream; 021 import java.io.IOException; 022 import java.io.InterruptedIOException; 023 import java.net.InetAddress; 024 import java.net.InetSocketAddress; 025 import java.net.Socket; 026 import java.net.SocketException; 027 import java.net.SocketTimeoutException; 028 import java.net.URI; 029 import java.net.UnknownHostException; 030 import java.util.HashMap; 031 import java.util.Map; 032 import java.util.concurrent.CountDownLatch; 033 import java.util.concurrent.SynchronousQueue; 034 import java.util.concurrent.ThreadFactory; 035 import java.util.concurrent.ThreadPoolExecutor; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.atomic.AtomicReference; 038 import javax.net.SocketFactory; 039 import org.apache.activemq.Service; 040 import org.apache.activemq.thread.DefaultThreadPools; 041 import org.apache.activemq.transport.Transport; 042 import org.apache.activemq.transport.TransportLoggerFactory; 043 import org.apache.activemq.transport.TransportThreadSupport; 044 import org.apache.activemq.util.InetAddressUtil; 045 import org.apache.activemq.util.IntrospectionSupport; 046 import org.apache.activemq.util.ServiceStopper; 047 import org.apache.activemq.wireformat.WireFormat; 048 import org.slf4j.Logger; 049 import org.slf4j.LoggerFactory; 050 051 /** 052 * An implementation of the {@link Transport} interface using raw tcp/ip 053 * 054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 055 * 056 */ 057 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 058 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); 059 protected final URI remoteLocation; 060 protected final URI localLocation; 061 protected final WireFormat wireFormat; 062 063 protected int connectionTimeout = 30000; 064 protected int soTimeout; 065 protected int socketBufferSize = 64 * 1024; 066 protected int ioBufferSize = 8 * 1024; 067 protected boolean closeAsync=true; 068 protected Socket socket; 069 protected DataOutputStream dataOut; 070 protected DataInputStream dataIn; 071 protected TimeStampStream buffOut = null; 072 /** 073 * The Traffic Class to be set on the socket. 074 */ 075 protected int trafficClass = 0; 076 /** 077 * Keeps track of attempts to set the Traffic Class on the socket. 078 */ 079 private boolean trafficClassSet = false; 080 /** 081 * Prevents setting both the Differentiated Services and Type of Service 082 * transport options at the same time, since they share the same spot in 083 * the TCP/IP packet headers. 084 */ 085 protected boolean diffServChosen = false; 086 protected boolean typeOfServiceChosen = false; 087 /** 088 * trace=true -> the Transport stack where this TcpTransport 089 * object will be, will have a TransportLogger layer 090 * trace=false -> the Transport stack where this TcpTransport 091 * object will be, will NOT have a TransportLogger layer, and therefore 092 * will never be able to print logging messages. 093 * This parameter is most probably set in Connection or TransportConnector URIs. 094 */ 095 protected boolean trace = false; 096 /** 097 * Name of the LogWriter implementation to use. 098 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 099 * This parameter is most probably set in Connection or TransportConnector URIs. 100 */ 101 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; 102 /** 103 * Specifies if the TransportLogger will be manageable by JMX or not. 104 * Also, as long as there is at least 1 TransportLogger which is manageable, 105 * a TransportLoggerControl MBean will me created. 106 */ 107 protected boolean dynamicManagement = false; 108 /** 109 * startLogging=true -> the TransportLogger object of the Transport stack 110 * will initially write messages to the log. 111 * startLogging=false -> the TransportLogger object of the Transport stack 112 * will initially NOT write messages to the log. 113 * This parameter only has an effect if trace == true. 114 * This parameter is most probably set in Connection or TransportConnector URIs. 115 */ 116 protected boolean startLogging = true; 117 /** 118 * Specifies the port that will be used by the JMX server to manage 119 * the TransportLoggers. 120 * This should only be set in an URI by a client (producer or consumer) since 121 * a broker will already create a JMX server. 122 * It is useful for people who test a broker and clients in the same machine 123 * and want to control both via JMX; a different port will be needed. 124 */ 125 protected int jmxPort = 1099; 126 protected boolean useLocalHost = false; 127 protected int minmumWireFormatVersion; 128 protected SocketFactory socketFactory; 129 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); 130 131 private Map<String, Object> socketOptions; 132 private Boolean keepAlive; 133 private Boolean tcpNoDelay; 134 private Thread runnerThread; 135 private volatile int receiveCounter; 136 137 /** 138 * Connect to a remote Node - e.g. a Broker 139 * 140 * @param wireFormat 141 * @param socketFactory 142 * @param remoteLocation 143 * @param localLocation - e.g. local InetAddress and local port 144 * @throws IOException 145 * @throws UnknownHostException 146 */ 147 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, 148 URI localLocation) throws UnknownHostException, IOException { 149 this.wireFormat = wireFormat; 150 this.socketFactory = socketFactory; 151 try { 152 this.socket = socketFactory.createSocket(); 153 } catch (SocketException e) { 154 this.socket = null; 155 } 156 this.remoteLocation = remoteLocation; 157 this.localLocation = localLocation; 158 setDaemon(false); 159 } 160 161 /** 162 * Initialize from a server Socket 163 * 164 * @param wireFormat 165 * @param socket 166 * @throws IOException 167 */ 168 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 169 this.wireFormat = wireFormat; 170 this.socket = socket; 171 this.remoteLocation = null; 172 this.localLocation = null; 173 setDaemon(true); 174 } 175 176 /** 177 * A one way asynchronous send 178 */ 179 public void oneway(Object command) throws IOException { 180 checkStarted(); 181 wireFormat.marshal(command, dataOut); 182 dataOut.flush(); 183 } 184 185 /** 186 * @return pretty print of 'this' 187 */ 188 @Override 189 public String toString() { 190 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() 191 : (localLocation != null ? localLocation : remoteLocation)) ; 192 } 193 194 /** 195 * reads packets from a Socket 196 */ 197 public void run() { 198 LOG.trace("TCP consumer thread for " + this + " starting"); 199 this.runnerThread=Thread.currentThread(); 200 try { 201 while (!isStopped()) { 202 doRun(); 203 } 204 } catch (IOException e) { 205 stoppedLatch.get().countDown(); 206 onException(e); 207 } catch (Throwable e){ 208 stoppedLatch.get().countDown(); 209 IOException ioe=new IOException("Unexpected error occured"); 210 ioe.initCause(e); 211 onException(ioe); 212 }finally { 213 stoppedLatch.get().countDown(); 214 } 215 } 216 217 protected void doRun() throws IOException { 218 try { 219 Object command = readCommand(); 220 doConsume(command); 221 } catch (SocketTimeoutException e) { 222 } catch (InterruptedIOException e) { 223 } 224 } 225 226 protected Object readCommand() throws IOException { 227 return wireFormat.unmarshal(dataIn); 228 } 229 230 // Properties 231 // ------------------------------------------------------------------------- 232 public String getDiffServ() { 233 // This is the value requested by the user by setting the Tcp Transport 234 // options. If the socket hasn't been created, then this value may not 235 // reflect the value returned by Socket.getTrafficClass(). 236 return Integer.toString(this.trafficClass); 237 } 238 239 public void setDiffServ(String diffServ) throws IllegalArgumentException { 240 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ); 241 this.diffServChosen = true; 242 } 243 244 public int getTypeOfService() { 245 // This is the value requested by the user by setting the Tcp Transport 246 // options. If the socket hasn't been created, then this value may not 247 // reflect the value returned by Socket.getTrafficClass(). 248 return this.trafficClass; 249 } 250 251 public void setTypeOfService(int typeOfService) { 252 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService); 253 this.typeOfServiceChosen = true; 254 } 255 256 public boolean isTrace() { 257 return trace; 258 } 259 260 public void setTrace(boolean trace) { 261 this.trace = trace; 262 } 263 264 public String getLogWriterName() { 265 return logWriterName; 266 } 267 268 public void setLogWriterName(String logFormat) { 269 this.logWriterName = logFormat; 270 } 271 272 public boolean isDynamicManagement() { 273 return dynamicManagement; 274 } 275 276 public void setDynamicManagement(boolean useJmx) { 277 this.dynamicManagement = useJmx; 278 } 279 280 public boolean isStartLogging() { 281 return startLogging; 282 } 283 284 public void setStartLogging(boolean startLogging) { 285 this.startLogging = startLogging; 286 } 287 288 public int getJmxPort() { 289 return jmxPort; 290 } 291 292 public void setJmxPort(int jmxPort) { 293 this.jmxPort = jmxPort; 294 } 295 296 public int getMinmumWireFormatVersion() { 297 return minmumWireFormatVersion; 298 } 299 300 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 301 this.minmumWireFormatVersion = minmumWireFormatVersion; 302 } 303 304 public boolean isUseLocalHost() { 305 return useLocalHost; 306 } 307 308 /** 309 * Sets whether 'localhost' or the actual local host name should be used to 310 * make local connections. On some operating systems such as Macs its not 311 * possible to connect as the local host name so localhost is better. 312 */ 313 public void setUseLocalHost(boolean useLocalHost) { 314 this.useLocalHost = useLocalHost; 315 } 316 317 public int getSocketBufferSize() { 318 return socketBufferSize; 319 } 320 321 /** 322 * Sets the buffer size to use on the socket 323 */ 324 public void setSocketBufferSize(int socketBufferSize) { 325 this.socketBufferSize = socketBufferSize; 326 } 327 328 public int getSoTimeout() { 329 return soTimeout; 330 } 331 332 /** 333 * Sets the socket timeout 334 */ 335 public void setSoTimeout(int soTimeout) { 336 this.soTimeout = soTimeout; 337 } 338 339 public int getConnectionTimeout() { 340 return connectionTimeout; 341 } 342 343 /** 344 * Sets the timeout used to connect to the socket 345 */ 346 public void setConnectionTimeout(int connectionTimeout) { 347 this.connectionTimeout = connectionTimeout; 348 } 349 350 public Boolean getKeepAlive() { 351 return keepAlive; 352 } 353 354 /** 355 * Enable/disable TCP KEEP_ALIVE mode 356 */ 357 public void setKeepAlive(Boolean keepAlive) { 358 this.keepAlive = keepAlive; 359 } 360 361 public Boolean getTcpNoDelay() { 362 return tcpNoDelay; 363 } 364 365 /** 366 * Enable/disable the TCP_NODELAY option on the socket 367 */ 368 public void setTcpNoDelay(Boolean tcpNoDelay) { 369 this.tcpNoDelay = tcpNoDelay; 370 } 371 372 /** 373 * @return the ioBufferSize 374 */ 375 public int getIoBufferSize() { 376 return this.ioBufferSize; 377 } 378 379 /** 380 * @param ioBufferSize the ioBufferSize to set 381 */ 382 public void setIoBufferSize(int ioBufferSize) { 383 this.ioBufferSize = ioBufferSize; 384 } 385 386 /** 387 * @return the closeAsync 388 */ 389 public boolean isCloseAsync() { 390 return closeAsync; 391 } 392 393 /** 394 * @param closeAsync the closeAsync to set 395 */ 396 public void setCloseAsync(boolean closeAsync) { 397 this.closeAsync = closeAsync; 398 } 399 400 // Implementation methods 401 // ------------------------------------------------------------------------- 402 protected String resolveHostName(String host) throws UnknownHostException { 403 if (isUseLocalHost()) { 404 String localName = InetAddressUtil.getLocalHostName(); 405 if (localName != null && localName.equals(host)) { 406 return "localhost"; 407 } 408 } 409 return host; 410 } 411 412 /** 413 * Configures the socket for use 414 * 415 * @param sock 416 * @throws SocketException, IllegalArgumentException if setting the options 417 * on the socket failed. 418 */ 419 protected void initialiseSocket(Socket sock) throws SocketException, 420 IllegalArgumentException { 421 if (socketOptions != null) { 422 IntrospectionSupport.setProperties(socket, socketOptions); 423 } 424 425 try { 426 sock.setReceiveBufferSize(socketBufferSize); 427 sock.setSendBufferSize(socketBufferSize); 428 } catch (SocketException se) { 429 LOG.warn("Cannot set socket buffer size = " + socketBufferSize); 430 LOG.debug("Cannot set socket buffer size. Reason: " + se, se); 431 } 432 sock.setSoTimeout(soTimeout); 433 434 if (keepAlive != null) { 435 sock.setKeepAlive(keepAlive.booleanValue()); 436 } 437 if (tcpNoDelay != null) { 438 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 439 } 440 if (!this.trafficClassSet) { 441 this.trafficClassSet = setTrafficClass(sock); 442 } 443 } 444 445 @Override 446 protected void doStart() throws Exception { 447 connect(); 448 stoppedLatch.set(new CountDownLatch(1)); 449 super.doStart(); 450 } 451 452 protected void connect() throws Exception { 453 454 if (socket == null && socketFactory == null) { 455 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); 456 } 457 458 InetSocketAddress localAddress = null; 459 InetSocketAddress remoteAddress = null; 460 461 if (localLocation != null) { 462 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), 463 localLocation.getPort()); 464 } 465 466 if (remoteLocation != null) { 467 String host = resolveHostName(remoteLocation.getHost()); 468 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 469 } 470 // Set the traffic class before the socket is connected when possible so 471 // that the connection packets are given the correct traffic class. 472 this.trafficClassSet = setTrafficClass(socket); 473 474 if (socket != null) { 475 476 if (localAddress != null) { 477 socket.bind(localAddress); 478 } 479 480 // If it's a server accepted socket.. we don't need to connect it 481 // to a remote address. 482 if (remoteAddress != null) { 483 if (connectionTimeout >= 0) { 484 socket.connect(remoteAddress, connectionTimeout); 485 } else { 486 socket.connect(remoteAddress); 487 } 488 } 489 490 } else { 491 // For SSL sockets.. you can't create an unconnected socket :( 492 // This means the timout option are not supported either. 493 if (localAddress != null) { 494 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), 495 localAddress.getAddress(), localAddress.getPort()); 496 } else { 497 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 498 } 499 } 500 501 initialiseSocket(socket); 502 initializeStreams(); 503 } 504 505 @Override 506 protected void doStop(ServiceStopper stopper) throws Exception { 507 if (LOG.isDebugEnabled()) { 508 LOG.debug("Stopping transport " + this); 509 } 510 511 // Closing the streams flush the sockets before closing.. if the socket 512 // is hung.. then this hangs the close. 513 // closeStreams(); 514 if (socket != null) { 515 if (closeAsync) { 516 //closing the socket can hang also 517 final CountDownLatch latch = new CountDownLatch(1); 518 519 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 520 521 public void run() { 522 try { 523 socket.close(); 524 } catch (IOException e) { 525 LOG.debug("Caught exception closing socket",e); 526 }finally { 527 latch.countDown(); 528 } 529 } 530 531 }); 532 latch.await(1,TimeUnit.SECONDS); 533 }else { 534 try { 535 socket.close(); 536 } catch (IOException e) { 537 LOG.debug("Caught exception closing socket",e); 538 } 539 } 540 541 } 542 } 543 544 /** 545 * Override so that stop() blocks until the run thread is no longer running. 546 */ 547 @Override 548 public void stop() throws Exception { 549 super.stop(); 550 CountDownLatch countDownLatch = stoppedLatch.get(); 551 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { 552 countDownLatch.await(1,TimeUnit.SECONDS); 553 } 554 } 555 556 protected void initializeStreams() throws Exception { 557 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 558 @Override 559 public int read() throws IOException { 560 receiveCounter++; 561 return super.read(); 562 } 563 @Override 564 public int read(byte[] b, int off, int len) throws IOException { 565 receiveCounter++; 566 return super.read(b, off, len); 567 } 568 @Override 569 public long skip(long n) throws IOException { 570 receiveCounter++; 571 return super.skip(n); 572 } 573 @Override 574 protected void fill() throws IOException { 575 receiveCounter++; 576 super.fill(); 577 } 578 }; 579 this.dataIn = new DataInputStream(buffIn); 580 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 581 this.dataOut = new DataOutputStream(outputStream); 582 this.buffOut = outputStream; 583 } 584 585 protected void closeStreams() throws IOException { 586 if (dataOut != null) { 587 dataOut.close(); 588 } 589 if (dataIn != null) { 590 dataIn.close(); 591 } 592 } 593 594 public void setSocketOptions(Map<String, Object> socketOptions) { 595 this.socketOptions = new HashMap<String, Object>(socketOptions); 596 } 597 598 public String getRemoteAddress() { 599 if (socket != null) { 600 return "" + socket.getRemoteSocketAddress(); 601 } 602 return null; 603 } 604 605 @Override 606 public <T> T narrow(Class<T> target) { 607 if (target == Socket.class) { 608 return target.cast(socket); 609 } else if ( target == TimeStampStream.class) { 610 return target.cast(buffOut); 611 } 612 return super.narrow(target); 613 } 614 615 public int getReceiveCounter() { 616 return receiveCounter; 617 } 618 619 620 /** 621 * @param sock The socket on which to set the Traffic Class. 622 * @return Whether or not the Traffic Class was set on the given socket. 623 * @throws SocketException if the system does not support setting the 624 * Traffic Class. 625 * @throws IllegalArgumentException if both the Differentiated Services and 626 * Type of Services transport options have been set on the same 627 * connection. 628 */ 629 private boolean setTrafficClass(Socket sock) throws SocketException, 630 IllegalArgumentException { 631 if (sock == null 632 || (!this.diffServChosen && !this.typeOfServiceChosen)) { 633 return false; 634 } 635 if (this.diffServChosen && this.typeOfServiceChosen) { 636 throw new IllegalArgumentException("Cannot set both the " 637 + " Differentiated Services and Type of Services transport " 638 + " options on the same connection."); 639 } 640 641 sock.setTrafficClass(this.trafficClass); 642 643 int resultTrafficClass = sock.getTrafficClass(); 644 if (this.trafficClass != resultTrafficClass) { 645 // In the case where the user has specified the ECN bits (e.g. in 646 // Type of Service) but the system won't allow the ECN bits to be 647 // set or in the case where setting the traffic class failed for 648 // other reasons, emit a warning. 649 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2) 650 && (this.trafficClass & 3) != (resultTrafficClass & 3)) { 651 LOG.warn("Attempted to set the Traffic Class to " 652 + this.trafficClass + " but the result Traffic Class was " 653 + resultTrafficClass + ". Please check that your system " 654 + "allows you to set the ECN bits (the first two bits)."); 655 } else { 656 LOG.warn("Attempted to set the Traffic Class to " 657 + this.trafficClass + " but the result Traffic Class was " 658 + resultTrafficClass + ". Please check that your system " 659 + "supports java.net.setTrafficClass."); 660 } 661 return false; 662 } 663 // Reset the guards that prevent both the Differentiated Services 664 // option and the Type of Service option from being set on the same 665 // connection. 666 this.diffServChosen = false; 667 this.typeOfServiceChosen = false; 668 return true; 669 } 670 }