001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.net.Socket;
024    import java.net.URI;
025    import java.net.UnknownHostException;
026    import java.nio.ByteBuffer;
027    import java.nio.channels.SelectionKey;
028    import java.nio.channels.SocketChannel;
029    
030    import javax.net.SocketFactory;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.transport.Transport;
034    import org.apache.activemq.transport.tcp.TcpTransport;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.ServiceStopper;
037    import org.apache.activemq.wireformat.WireFormat;
038    
039    /**
040     * An implementation of the {@link Transport} interface using raw tcp/ip
041     * 
042     * 
043     */
044    public class NIOTransport extends TcpTransport {
045    
046        // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class);
047        private SocketChannel channel;
048        private SelectorSelection selection;
049        private ByteBuffer inputBuffer;
050        private ByteBuffer currentBuffer;
051        private int nextFrameSize;
052    
053        public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
054            super(wireFormat, socketFactory, remoteLocation, localLocation);
055        }
056    
057        public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
058            super(wireFormat, socket);
059        }
060    
061        protected void initializeStreams() throws IOException {
062            channel = socket.getChannel();
063            channel.configureBlocking(false);
064    
065            // listen for events telling us when the socket is readable.
066            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
067                public void onSelect(SelectorSelection selection) {
068                    serviceRead();
069                }
070    
071                public void onError(SelectorSelection selection, Throwable error) {
072                    if (error instanceof IOException) {
073                        onException((IOException)error);
074                    } else {
075                        onException(IOExceptionSupport.create(error));
076                    }
077                }
078            });
079    
080            // Send the data via the channel
081            // inputBuffer = ByteBuffer.allocateDirect(8*1024);
082            inputBuffer = ByteBuffer.allocate(8 * 1024);
083            currentBuffer = inputBuffer;
084            nextFrameSize = -1;
085            currentBuffer.limit(4);
086            NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
087            this.dataOut = new DataOutputStream(outPutStream);
088            this.buffOut = outPutStream;
089        }
090    
091        private void serviceRead() {
092            try {
093                while (true) {
094    
095                    int readSize = channel.read(currentBuffer);
096                    if (readSize == -1) {
097                        onException(new EOFException());
098                        selection.close();
099                        break;
100                    }
101                    if (readSize == 0) {
102                        break;
103                    }
104    
105                    if (currentBuffer.hasRemaining()) {
106                        continue;
107                    }
108    
109                    // Are we trying to figure out the size of the next frame?
110                    if (nextFrameSize == -1) {
111                        assert inputBuffer == currentBuffer;
112    
113                        // If the frame is too big to fit in our direct byte buffer,
114                        // Then allocate a non direct byte buffer of the right size
115                        // for it.
116                        inputBuffer.flip();
117                        nextFrameSize = inputBuffer.getInt() + 4;
118                        if (nextFrameSize > inputBuffer.capacity()) {
119                            currentBuffer = ByteBuffer.allocate(nextFrameSize);
120                            currentBuffer.putInt(nextFrameSize);
121                        } else {
122                            inputBuffer.limit(nextFrameSize);
123                        }
124    
125                    } else {
126                        currentBuffer.flip();
127    
128                        Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
129                        doConsume((Command)command);
130    
131                        nextFrameSize = -1;
132                        inputBuffer.clear();
133                        inputBuffer.limit(4);
134                        currentBuffer = inputBuffer;
135                    }
136    
137                }
138    
139            } catch (IOException e) {
140                onException(e);
141            } catch (Throwable e) {
142                onException(IOExceptionSupport.create(e));
143            }
144        }
145    
146        protected void doStart() throws Exception {
147            connect();
148            selection.setInterestOps(SelectionKey.OP_READ);
149            selection.enable();
150        }
151    
152        protected void doStop(ServiceStopper stopper) throws Exception {
153            if (selection != null) {
154                selection.close();
155            }
156            super.doStop(stopper);
157        }
158    }