001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.net.Socket;
024    import java.net.URI;
025    import java.net.UnknownHostException;
026    import java.nio.ByteBuffer;
027    import java.nio.channels.SelectionKey;
028    import java.nio.channels.SocketChannel;
029    import java.util.HashMap;
030    
031    import javax.net.SocketFactory;
032    
033    import org.apache.activemq.command.Command;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.nio.NIOOutputStream;
036    import org.apache.activemq.transport.nio.SelectorManager;
037    import org.apache.activemq.transport.nio.SelectorSelection;
038    import org.apache.activemq.transport.tcp.TcpTransport;
039    import org.apache.activemq.util.ByteArrayOutputStream;
040    import org.apache.activemq.util.ByteSequence;
041    import org.apache.activemq.util.DataByteArrayInputStream;
042    import org.apache.activemq.util.IOExceptionSupport;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.wireformat.WireFormat;
045    
046    /**
047     * An implementation of the {@link Transport} interface for using Stomp over NIO
048     *
049     * 
050     */
051    public class StompNIOTransport extends TcpTransport {
052    
053        private SocketChannel channel;
054        private SelectorSelection selection;
055    
056        private ByteBuffer inputBuffer;
057        ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
058        boolean processedHeaders = false;
059        String action;
060        HashMap<String, String> headers;
061        int contentLength = -1;
062        int readLength = 0;
063        int previousByte = -1;
064    
065        public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
066            super(wireFormat, socketFactory, remoteLocation, localLocation);
067        }
068    
069        public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
070            super(wireFormat, socket);
071        }
072    
073        protected void initializeStreams() throws IOException {
074            channel = socket.getChannel();
075            channel.configureBlocking(false);
076    
077            // listen for events telling us when the socket is readable.
078            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
079                public void onSelect(SelectorSelection selection) {
080                    serviceRead();
081                }
082    
083                public void onError(SelectorSelection selection, Throwable error) {
084                    if (error instanceof IOException) {
085                        onException((IOException)error);
086                    } else {
087                        onException(IOExceptionSupport.create(error));
088                    }
089                }
090            });
091    
092            inputBuffer = ByteBuffer.allocate(8 * 1024);
093            NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
094            this.dataOut = new DataOutputStream(outPutStream);
095            this.buffOut = outPutStream;
096        }
097    
098        private void serviceRead() {
099            try {
100    
101               while (true) {
102                   // read channel
103                   int readSize = channel.read(inputBuffer);
104                   // channel is closed, cleanup
105                   if (readSize == -1) {
106                       onException(new EOFException());
107                       selection.close();
108                       break;
109                   }
110                   // nothing more to read, break
111                   if (readSize == 0) {
112                       break;
113                   }
114    
115                   inputBuffer.flip();
116    
117                   int b;
118                   ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
119    
120                   int i = 0;
121                   while(i++ < readSize) {
122                       b = input.read();
123                       // skip repeating nulls
124                       if (!processedHeaders && previousByte == 0 && b == 0) {
125                           continue;
126                       }
127    
128                       if (!processedHeaders) {
129                           currentCommand.write(b);
130                           // end of headers section, parse action and header
131                           if (previousByte == '\n' && b == '\n') {
132                               if (wireFormat instanceof StompWireFormat) {
133                                   DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
134                                   action = ((StompWireFormat)wireFormat).parseAction(data);
135                                   headers = ((StompWireFormat)wireFormat).parseHeaders(data);
136                                   String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
137                                   if (contentLengthHeader != null) {
138                                       contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
139                                   } else {
140                                       contentLength = -1;
141                                   }
142                               }
143                               processedHeaders = true;
144                               currentCommand.reset();
145                           }
146                       } else {
147    
148                           if (contentLength == -1) {
149                               // end of command reached, unmarshal
150                               if (b == 0) {
151                                   processCommand();
152                               } else {
153                                   currentCommand.write(b);
154                               }
155                           } else {
156                               // read desired content length
157                               if (readLength++ == contentLength) {
158                                   processCommand();
159                                   readLength = 0;
160                               } else {
161                                   currentCommand.write(b);
162                               }
163                           }
164                       }
165    
166                       previousByte = b;
167                   }
168                   // clear the buffer
169                   inputBuffer.clear();
170    
171               }
172            } catch (IOException e) {
173                onException(e);
174            } catch (Throwable e) {
175                onException(IOExceptionSupport.create(e));
176            }
177        }
178    
179        private void processCommand() throws Exception {
180            StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
181            doConsume(frame);
182            processedHeaders = false;
183            currentCommand.reset();
184            contentLength = -1;
185        }
186    
187        protected void doStart() throws Exception {
188            connect();
189            selection.setInterestOps(SelectionKey.OP_READ);
190            selection.enable();
191        }
192    
193        protected void doStop(ServiceStopper stopper) throws Exception {
194            try {
195                selection.close();
196            } catch (Exception e) {
197                    e.printStackTrace();
198            }
199            super.doStop(stopper);
200        }
201    }