001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.nio; 018 019 import java.io.EOFException; 020 import java.io.IOException; 021 import java.io.InterruptedIOException; 022 import java.io.OutputStream; 023 import java.nio.ByteBuffer; 024 import java.nio.channels.WritableByteChannel; 025 026 import org.apache.activemq.transport.tcp.TimeStampStream; 027 028 /** 029 * An optimized buffered outputstream for Tcp 030 * 031 * 032 */ 033 034 public class NIOOutputStream extends OutputStream implements TimeStampStream { 035 036 private static final int BUFFER_SIZE = 8192; 037 038 private final WritableByteChannel out; 039 private final byte[] buffer; 040 private final ByteBuffer byteBuffer; 041 042 private int count; 043 private boolean closed; 044 private volatile long writeTimestamp = -1;//concurrent reads of this value 045 046 /** 047 * Constructor 048 * 049 * @param out 050 */ 051 public NIOOutputStream(WritableByteChannel out) { 052 this(out, BUFFER_SIZE); 053 } 054 055 /** 056 * Creates a new buffered output stream to write data to the specified 057 * underlying output stream with the specified buffer size. 058 * 059 * @param out the underlying output stream. 060 * @param size the buffer size. 061 * @throws IllegalArgumentException if size <= 0. 062 */ 063 public NIOOutputStream(WritableByteChannel out, int size) { 064 this.out = out; 065 if (size <= 0) { 066 throw new IllegalArgumentException("Buffer size <= 0"); 067 } 068 buffer = new byte[size]; 069 byteBuffer = ByteBuffer.wrap(buffer); 070 } 071 072 /** 073 * write a byte on to the stream 074 * 075 * @param b - byte to write 076 * @throws IOException 077 */ 078 public void write(int b) throws IOException { 079 checkClosed(); 080 if (availableBufferToWrite() < 1) { 081 flush(); 082 } 083 buffer[count++] = (byte)b; 084 } 085 086 /** 087 * write a byte array to the stream 088 * 089 * @param b the byte buffer 090 * @param off the offset into the buffer 091 * @param len the length of data to write 092 * @throws IOException 093 */ 094 public void write(byte b[], int off, int len) throws IOException { 095 checkClosed(); 096 if (availableBufferToWrite() < len) { 097 flush(); 098 } 099 if (buffer.length >= len) { 100 System.arraycopy(b, off, buffer, count, len); 101 count += len; 102 } else { 103 write(ByteBuffer.wrap(b, off, len)); 104 } 105 } 106 107 /** 108 * flush the data to the output stream This doesn't call flush on the 109 * underlying outputstream, because Tcp is particularly efficent at doing 110 * this itself .... 111 * 112 * @throws IOException 113 */ 114 public void flush() throws IOException { 115 if (count > 0 && out != null) { 116 byteBuffer.position(0); 117 byteBuffer.limit(count); 118 write(byteBuffer); 119 count = 0; 120 } 121 } 122 123 /** 124 * close this stream 125 * 126 * @throws IOException 127 */ 128 public void close() throws IOException { 129 super.close(); 130 closed = true; 131 } 132 133 /** 134 * Checks that the stream has not been closed 135 * 136 * @throws IOException 137 */ 138 protected void checkClosed() throws IOException { 139 if (closed) { 140 throw new EOFException("Cannot write to the stream any more it has already been closed"); 141 } 142 } 143 144 /** 145 * @return the amount free space in the buffer 146 */ 147 private int availableBufferToWrite() { 148 return buffer.length - count; 149 } 150 151 protected void write(ByteBuffer data) throws IOException { 152 int remaining = data.remaining(); 153 int lastRemaining = remaining - 1; 154 long delay = 1; 155 try { 156 writeTimestamp = System.currentTimeMillis(); 157 while (remaining > 0) { 158 159 // We may need to do a little bit of sleeping to avoid a busy loop. 160 // Slow down if no data was written out.. 161 if (remaining == lastRemaining) { 162 try { 163 // Use exponential rollback to increase sleep time. 164 Thread.sleep(delay); 165 delay *= 2; 166 if (delay > 1000) { 167 delay = 1000; 168 } 169 } catch (InterruptedException e) { 170 throw new InterruptedIOException(); 171 } 172 } else { 173 delay = 1; 174 } 175 lastRemaining = remaining; 176 177 // Since the write is non-blocking, all the data may not have been 178 // written. 179 out.write(data); 180 remaining = data.remaining(); 181 } 182 } finally { 183 writeTimestamp = -1; 184 } 185 } 186 187 188 /* (non-Javadoc) 189 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() 190 */ 191 public boolean isWriting() { 192 return writeTimestamp > 0; 193 } 194 195 /* (non-Javadoc) 196 * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() 197 */ 198 public long getWriteTimestamp() { 199 return writeTimestamp; 200 } 201 202 }