001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.concurrent.LinkedBlockingQueue;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import java.util.concurrent.atomic.AtomicLong;
025    import org.apache.activemq.command.ShutdownInfo;
026    import org.apache.activemq.thread.DefaultThreadPools;
027    import org.apache.activemq.thread.Task;
028    import org.apache.activemq.thread.TaskRunner;
029    import org.apache.activemq.thread.TaskRunnerFactory;
030    import org.apache.activemq.thread.Valve;
031    import org.apache.activemq.transport.FutureResponse;
032    import org.apache.activemq.transport.ResponseCallback;
033    import org.apache.activemq.transport.Transport;
034    import org.apache.activemq.transport.TransportDisposedIOException;
035    import org.apache.activemq.transport.TransportListener;
036    import org.apache.activemq.util.IOExceptionSupport;
037    
038    
039    /**
040     * A Transport implementation that uses direct method invocations.
041     * 
042     * 
043     */
044    public class VMTransport implements Transport, Task {
045    
046        private static final Object DISCONNECT = new Object();
047        private static final AtomicLong NEXT_ID = new AtomicLong(0);
048        protected VMTransport peer;
049        protected TransportListener transportListener;
050        protected boolean disposed;
051        protected boolean marshal;
052        protected boolean network;
053        protected boolean async = true;
054        protected int asyncQueueDepth = 2000;
055        protected LinkedBlockingQueue<Object> messageQueue;
056        protected boolean started;
057        protected final URI location;
058        protected final long id;
059        private TaskRunner taskRunner;
060        private final Object lazyInitMutext = new Object();
061        private final Valve enqueueValve = new Valve(true);
062        protected final AtomicBoolean stopping = new AtomicBoolean();
063        private volatile int receiveCounter;
064        
065        public VMTransport(URI location) {
066            this.location = location;
067            this.id = NEXT_ID.getAndIncrement();
068        }
069    
070        public void setPeer(VMTransport peer) {
071            this.peer = peer;
072        }
073    
074        public void oneway(Object command) throws IOException {
075            if (disposed) {
076                throw new TransportDisposedIOException("Transport disposed.");
077            }
078            if (peer == null) {
079                throw new IOException("Peer not connected.");
080            }
081    
082            
083            TransportListener transportListener=null;
084            try {
085                // Disable the peer from changing his state while we try to enqueue onto him.
086                peer.enqueueValve.increment();
087            
088                if (peer.disposed || peer.stopping.get()) {
089                    throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090                }
091                
092                if (peer.started) {
093                    if (peer.async) {
094                        peer.getMessageQueue().put(command);
095                        peer.wakeup();
096                    } else {
097                        transportListener = peer.transportListener;
098                    }
099                } else {
100                    peer.getMessageQueue().put(command);
101                }
102                
103            } catch (InterruptedException e) {
104                InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
105                iioe.initCause(e);
106                throw iioe;
107            } finally {
108                // Allow the peer to change state again...
109                peer.enqueueValve.decrement();
110            }
111    
112            dispatch(peer, transportListener, command);
113        }
114        
115        public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
116            if( transportListener!=null ) {
117                if( command == DISCONNECT ) {
118                    transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
119                } else {
120                    transport.receiveCounter++;
121                    transportListener.onCommand(command);
122                }
123            }
124        }
125    
126        public void start() throws Exception {
127            if (transportListener == null) {
128                throw new IOException("TransportListener not set.");
129            }
130            try {
131                enqueueValve.turnOff();
132                if (messageQueue != null && !async) {
133                    Object command;
134                    while ((command = messageQueue.poll()) != null && !stopping.get() ) {
135                        receiveCounter++;
136                        dispatch(this, transportListener, command);
137                    }
138                }
139                started = true;
140                wakeup();
141            } finally {
142                enqueueValve.turnOn();
143            }
144            // If we get stopped while starting up, then do the actual stop now 
145            // that the enqueueValve is back on.
146            if( stopping.get() ) {
147                stop();
148            }
149        }
150    
151        public void stop() throws Exception {
152            stopping.set(true);
153            
154            // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
155            if( enqueueValve.isOn() ) {
156                    
157                // let the peer know that we are disconnecting..
158                try {
159                    peer.transportListener.onCommand(new ShutdownInfo());
160                } catch (Exception ignore) {
161                }
162                    
163                    
164                TaskRunner tr = null;
165                try {
166                    enqueueValve.turnOff();
167                    if (!disposed) {
168                        started = false;
169                        disposed = true;
170                        if (taskRunner != null) {
171                            tr = taskRunner;
172                            taskRunner = null;
173                        }
174                    }
175                } finally {
176                    stopping.set(false);
177                    enqueueValve.turnOn();
178                }
179                if (tr != null) {
180                    tr.shutdown(1000);
181                }
182                
183    
184            }
185            
186        }
187        
188        /**
189         * @see org.apache.activemq.thread.Task#iterate()
190         */
191        public boolean iterate() {
192            
193            final TransportListener tl;
194            try {
195                // Disable changing the state variables while we are running... 
196                enqueueValve.increment();
197                tl = transportListener;
198                if (!started || disposed || tl == null || stopping.get()) {
199                    if( stopping.get() ) {
200                        // drain the queue it since folks could be blocked putting on to
201                        // it and that would not allow the stop() method for finishing up.
202                        getMessageQueue().clear();  
203                    }
204                    return false;
205                }
206            } catch (InterruptedException e) {
207                return false;
208            } finally {
209                enqueueValve.decrement();
210            }
211    
212            LinkedBlockingQueue<Object> mq = getMessageQueue();
213            Object command = mq.poll();
214            if (command != null) {
215                if( command == DISCONNECT ) {
216                    tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
217                } else {
218                    tl.onCommand(command);
219                }
220                return !mq.isEmpty();
221            } else {
222                return false;
223            }
224            
225        }
226    
227        public void setTransportListener(TransportListener commandListener) {
228            try {
229                try {
230                    enqueueValve.turnOff();
231                    this.transportListener = commandListener;
232                    wakeup();
233                } finally {
234                    enqueueValve.turnOn();
235                }
236            } catch (InterruptedException e) {
237                throw new RuntimeException(e);
238            }
239        }
240    
241        private LinkedBlockingQueue<Object> getMessageQueue() {
242            synchronized (lazyInitMutext) {
243                if (messageQueue == null) {
244                    messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
245                }
246                return messageQueue;
247            }
248        }
249    
250        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
251            throw new AssertionError("Unsupported Method");
252        }
253    
254        public Object request(Object command) throws IOException {
255            throw new AssertionError("Unsupported Method");
256        }
257    
258        public Object request(Object command, int timeout) throws IOException {
259            throw new AssertionError("Unsupported Method");
260        }
261    
262        public TransportListener getTransportListener() {
263            return transportListener;
264        }
265    
266        public <T> T narrow(Class<T> target) {
267            if (target.isAssignableFrom(getClass())) {
268                return target.cast(this);
269            }
270            return null;
271        }
272    
273        public boolean isMarshal() {
274            return marshal;
275        }
276    
277        public void setMarshal(boolean marshal) {
278            this.marshal = marshal;
279        }
280    
281        public boolean isNetwork() {
282            return network;
283        }
284    
285        public void setNetwork(boolean network) {
286            this.network = network;
287        }
288    
289        @Override
290        public String toString() {
291            return location + "#" + id;
292        }
293    
294        public String getRemoteAddress() {
295            if (peer != null) {
296                return peer.toString();
297            }
298            return null;
299        }
300    
301        /**
302         * @return the async
303         */
304        public boolean isAsync() {
305            return async;
306        }
307    
308        /**
309         * @param async the async to set
310         */
311        public void setAsync(boolean async) {
312            this.async = async;
313        }
314    
315        /**
316         * @return the asyncQueueDepth
317         */
318        public int getAsyncQueueDepth() {
319            return asyncQueueDepth;
320        }
321    
322        /**
323         * @param asyncQueueDepth the asyncQueueDepth to set
324         */
325        public void setAsyncQueueDepth(int asyncQueueDepth) {
326            this.asyncQueueDepth = asyncQueueDepth;
327        }
328    
329        protected void wakeup() {
330            if (async) {
331                synchronized (lazyInitMutext) {
332                    if (taskRunner == null) {
333                        taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
334                    }
335                }
336                try {
337                    taskRunner.wakeup();
338                } catch (InterruptedException e) {
339                    Thread.currentThread().interrupt();
340                }
341            }
342        }
343    
344        public boolean isFaultTolerant() {
345            return false;
346        }
347    
348            public boolean isDisposed() {
349                    return disposed;
350            }
351            
352            public boolean isConnected() {
353                return started;
354            }
355    
356            public void reconnect(URI uri) throws IOException {
357            throw new IOException("Not supported");
358        }
359    
360        public boolean isReconnectSupported() {
361            return false;
362        }
363    
364        public boolean isUpdateURIsSupported() {
365            return false;
366        }
367        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
368            throw new IOException("Not supported");
369        }
370    
371        public int getReceiveCounter() {
372            return receiveCounter;
373        }
374    }