001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.OutputStreamWriter;
021    import java.io.PrintWriter;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.broker.BrokerContext;
031    import org.apache.activemq.broker.BrokerContextAware;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.ActiveMQMessage;
034    import org.apache.activemq.command.ActiveMQTempQueue;
035    import org.apache.activemq.command.ActiveMQTempTopic;
036    import org.apache.activemq.command.Command;
037    import org.apache.activemq.command.ConnectionError;
038    import org.apache.activemq.command.ConnectionId;
039    import org.apache.activemq.command.ConnectionInfo;
040    import org.apache.activemq.command.ConsumerId;
041    import org.apache.activemq.command.ConsumerInfo;
042    import org.apache.activemq.command.DestinationInfo;
043    import org.apache.activemq.command.ExceptionResponse;
044    import org.apache.activemq.command.LocalTransactionId;
045    import org.apache.activemq.command.MessageAck;
046    import org.apache.activemq.command.MessageDispatch;
047    import org.apache.activemq.command.MessageId;
048    import org.apache.activemq.command.ProducerId;
049    import org.apache.activemq.command.ProducerInfo;
050    import org.apache.activemq.command.RemoveSubscriptionInfo;
051    import org.apache.activemq.command.Response;
052    import org.apache.activemq.command.SessionId;
053    import org.apache.activemq.command.SessionInfo;
054    import org.apache.activemq.command.ShutdownInfo;
055    import org.apache.activemq.command.TransactionId;
056    import org.apache.activemq.command.TransactionInfo;
057    import org.apache.activemq.util.ByteArrayOutputStream;
058    import org.apache.activemq.util.FactoryFinder;
059    import org.apache.activemq.util.IOExceptionSupport;
060    import org.apache.activemq.util.IdGenerator;
061    import org.apache.activemq.util.IntrospectionSupport;
062    import org.apache.activemq.util.LongSequenceGenerator;
063    import org.slf4j.Logger;
064    import org.slf4j.LoggerFactory;
065    import org.springframework.context.ApplicationContextAware;
066    
067    /**
068     * @author <a href="http://hiramchirino.com">chirino</a>
069     */
070    public class ProtocolConverter {
071    
072        private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
073        
074        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
075    
076        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
077        private final SessionId sessionId = new SessionId(connectionId, -1);
078        private final ProducerId producerId = new ProducerId(sessionId, 1);
079    
080        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
081        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
082        private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
083        private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
084    
085        private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
086        private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
087        private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
088        private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
089        private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
090        private final StompTransport stompTransport;
091    
092        private final Object commnadIdMutex = new Object();
093        private int lastCommandId;
094        private final AtomicBoolean connected = new AtomicBoolean(false);
095        private final FrameTranslator frameTranslator;
096        private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
097        private final BrokerContext brokerContext;
098    
099        public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
100            this.stompTransport = stompTransport;
101            this.frameTranslator = translator;
102            this.brokerContext = brokerContext;
103        }
104    
105        protected int generateCommandId() {
106            synchronized (commnadIdMutex) {
107                return lastCommandId++;
108            }
109        }
110    
111        protected ResponseHandler createResponseHandler(final StompFrame command) {
112            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
113            if (receiptId != null) {
114                return new ResponseHandler() {
115                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
116                        if (response.isException()) {
117                            // Generally a command can fail.. but that does not invalidate the connection.
118                            // We report back the failure but we don't close the connection.
119                            Throwable exception = ((ExceptionResponse)response).getException();
120                            handleException(exception, command);
121                        } else {
122                            StompFrame sc = new StompFrame();
123                            sc.setAction(Stomp.Responses.RECEIPT);
124                            sc.setHeaders(new HashMap<String, String>(1));
125                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
126                            stompTransport.sendToStomp(sc);
127                        }
128                    }
129                };
130            }
131            return null;
132        }
133    
134        protected void sendToActiveMQ(Command command, ResponseHandler handler) {
135            command.setCommandId(generateCommandId());
136            if (handler != null) {
137                command.setResponseRequired(true);
138                resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
139            }
140            stompTransport.sendToActiveMQ(command);
141        }
142    
143        protected void sendToStomp(StompFrame command) throws IOException {
144            stompTransport.sendToStomp(command);
145        }
146    
147        protected FrameTranslator findTranslator(String header) {
148            FrameTranslator translator = frameTranslator;
149            try {
150                if (header != null) {
151                    translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
152                            .newInstance(header);
153                    if (translator instanceof BrokerContextAware) {
154                        ((BrokerContextAware)translator).setBrokerContext(brokerContext);
155                    }
156                }
157            } catch (Exception ignore) {
158                // if anything goes wrong use the default translator
159            }
160    
161            return translator;
162        }
163    
164        /**
165         * Convert a stomp command
166         *
167         * @param command
168         */
169        public void onStompCommand(StompFrame command) throws IOException, JMSException {
170            try {
171    
172                if (command.getClass() == StompFrameError.class) {
173                    throw ((StompFrameError)command).getException();
174                }
175    
176                String action = command.getAction();
177                if (action.startsWith(Stomp.Commands.SEND)) {
178                    onStompSend(command);
179                } else if (action.startsWith(Stomp.Commands.ACK)) {
180                    onStompAck(command);
181                } else if (action.startsWith(Stomp.Commands.BEGIN)) {
182                    onStompBegin(command);
183                } else if (action.startsWith(Stomp.Commands.COMMIT)) {
184                    onStompCommit(command);
185                } else if (action.startsWith(Stomp.Commands.ABORT)) {
186                    onStompAbort(command);
187                } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
188                    onStompSubscribe(command);
189                } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
190                    onStompUnsubscribe(command);
191                } else if (action.startsWith(Stomp.Commands.CONNECT)) {
192                    onStompConnect(command);
193                } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
194                    onStompDisconnect(command);
195                } else {
196                    throw new ProtocolException("Unknown STOMP action: " + action);
197                }
198    
199            } catch (ProtocolException e) {
200                handleException(e, command);
201                // Some protocol errors can cause the connection to get closed.
202                if( e.isFatal() ) {
203                   getStompTransport().onException(e);
204                }
205            }
206        }
207    
208        protected void handleException(Throwable exception, StompFrame command) throws IOException {
209            LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
210            if (LOG.isDebugEnabled()) {
211                LOG.debug("Exception detail", exception);
212            }
213    
214            // Let the stomp client know about any protocol errors.
215            ByteArrayOutputStream baos = new ByteArrayOutputStream();
216            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
217            exception.printStackTrace(stream);
218            stream.close();
219    
220            HashMap<String, String> headers = new HashMap<String, String>();
221            headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
222    
223            if (command != null) {
224                final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
225                if (receiptId != null) {
226                    headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
227                }
228            }
229    
230            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
231            sendToStomp(errorMessage);
232        }
233    
234        protected void onStompSend(StompFrame command) throws IOException, JMSException {
235            checkConnected();
236    
237            Map<String, String> headers = command.getHeaders();
238            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
239            headers.remove("transaction");
240    
241            ActiveMQMessage message = convertMessage(command);
242    
243            message.setProducerId(producerId);
244            MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
245            message.setMessageId(id);
246            message.setJMSTimestamp(System.currentTimeMillis());
247    
248            if (stompTx != null) {
249                TransactionId activemqTx = transactions.get(stompTx);
250                if (activemqTx == null) {
251                    throw new ProtocolException("Invalid transaction id: " + stompTx);
252                }
253                message.setTransactionId(activemqTx);
254            }
255    
256            message.onSend();
257            sendToActiveMQ(message, createResponseHandler(command));
258    
259        }
260    
261        protected void onStompAck(StompFrame command) throws ProtocolException {
262            checkConnected();
263    
264            // TODO: acking with just a message id is very bogus
265            // since the same message id could have been sent to 2 different
266            // subscriptions
267            // on the same stomp connection. For example, when 2 subs are created on
268            // the same topic.
269    
270            Map<String, String> headers = command.getHeaders();
271            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
272            if (messageId == null) {
273                throw new ProtocolException("ACK received without a message-id to acknowledge!");
274            }
275    
276            TransactionId activemqTx = null;
277            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
278            if (stompTx != null) {
279                activemqTx = transactions.get(stompTx);
280                if (activemqTx == null) {
281                    throw new ProtocolException("Invalid transaction id: " + stompTx);
282                }
283            }
284    
285            boolean acked = false;
286            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
287                StompSubscription sub = iter.next();
288                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
289                if (ack != null) {
290                    ack.setTransactionId(activemqTx);
291                    sendToActiveMQ(ack, createResponseHandler(command));
292                    acked = true;
293                    break;
294                }
295            }
296    
297            if (!acked) {
298                throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
299            }
300    
301        }
302    
303        protected void onStompBegin(StompFrame command) throws ProtocolException {
304            checkConnected();
305    
306            Map<String, String> headers = command.getHeaders();
307    
308            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
309    
310            if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
311                throw new ProtocolException("Must specify the transaction you are beginning");
312            }
313    
314            if (transactions.get(stompTx) != null) {
315                throw new ProtocolException("The transaction was allready started: " + stompTx);
316            }
317    
318            LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
319            transactions.put(stompTx, activemqTx);
320    
321            TransactionInfo tx = new TransactionInfo();
322            tx.setConnectionId(connectionId);
323            tx.setTransactionId(activemqTx);
324            tx.setType(TransactionInfo.BEGIN);
325    
326            sendToActiveMQ(tx, createResponseHandler(command));
327    
328        }
329    
330        protected void onStompCommit(StompFrame command) throws ProtocolException {
331            checkConnected();
332    
333            Map<String, String> headers = command.getHeaders();
334    
335            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
336            if (stompTx == null) {
337                throw new ProtocolException("Must specify the transaction you are committing");
338            }
339    
340            TransactionId activemqTx = transactions.remove(stompTx);
341            if (activemqTx == null) {
342                throw new ProtocolException("Invalid transaction id: " + stompTx);
343            }
344    
345            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
346                StompSubscription sub = iter.next();
347                sub.onStompCommit(activemqTx);
348            }
349    
350            TransactionInfo tx = new TransactionInfo();
351            tx.setConnectionId(connectionId);
352            tx.setTransactionId(activemqTx);
353            tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
354    
355            sendToActiveMQ(tx, createResponseHandler(command));
356    
357        }
358    
359        protected void onStompAbort(StompFrame command) throws ProtocolException {
360            checkConnected();
361            Map<String, String> headers = command.getHeaders();
362    
363            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
364            if (stompTx == null) {
365                throw new ProtocolException("Must specify the transaction you are committing");
366            }
367    
368            TransactionId activemqTx = transactions.remove(stompTx);
369            if (activemqTx == null) {
370                throw new ProtocolException("Invalid transaction id: " + stompTx);
371            }
372            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
373                StompSubscription sub = iter.next();
374                try {
375                    sub.onStompAbort(activemqTx);
376                } catch (Exception e) {
377                    throw new ProtocolException("Transaction abort failed", false, e);
378                }
379            }
380    
381            TransactionInfo tx = new TransactionInfo();
382            tx.setConnectionId(connectionId);
383            tx.setTransactionId(activemqTx);
384            tx.setType(TransactionInfo.ROLLBACK);
385    
386            sendToActiveMQ(tx, createResponseHandler(command));
387    
388        }
389    
390        protected void onStompSubscribe(StompFrame command) throws ProtocolException {
391            checkConnected();
392            FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
393            Map<String, String> headers = command.getHeaders();
394    
395            String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
396            String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
397    
398            ActiveMQDestination actualDest = translator.convertDestination(this, destination);
399    
400            if (actualDest == null) {
401                throw new ProtocolException("Invalid Destination.");
402            }
403    
404            ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
405            ConsumerInfo consumerInfo = new ConsumerInfo(id);
406            consumerInfo.setPrefetchSize(1000);
407            consumerInfo.setDispatchAsync(true);
408    
409            String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
410            consumerInfo.setSelector(selector);
411    
412            IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
413    
414            consumerInfo.setDestination(translator.convertDestination(this, destination));
415    
416            StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
417            stompSubscription.setDestination(actualDest);
418    
419            String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
420            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
421                stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
422            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
423                stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
424            } else {
425                stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
426            }
427    
428            subscriptionsByConsumerId.put(id, stompSubscription);
429            sendToActiveMQ(consumerInfo, createResponseHandler(command));
430    
431        }
432    
433        protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
434            checkConnected();
435            Map<String, String> headers = command.getHeaders();
436    
437            ActiveMQDestination destination = null;
438            Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
439            if (o != null) {
440                destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
441            }
442    
443            String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
444    
445            if (subscriptionId == null && destination == null) {
446                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
447            }
448    
449            // check if it is a durable subscription
450            String durable = command.getHeaders().get("activemq.subscriptionName");
451            if (durable != null) {
452                RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
453                info.setClientId(durable);
454                info.setSubscriptionName(durable);
455                info.setConnectionId(connectionId);
456                sendToActiveMQ(info, createResponseHandler(command));
457                return;
458            }
459    
460            // TODO: Unsubscribing using a destination is a bit wierd if multiple
461            // subscriptions
462            // are created with the same destination. Perhaps this should be
463            // removed.
464            //
465            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
466                StompSubscription sub = iter.next();
467                if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
468                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
469                    iter.remove();
470                    return;
471                }
472            }
473    
474            throw new ProtocolException("No subscription matched.");
475        }
476    
477        ConnectionInfo connectionInfo = new ConnectionInfo();
478    
479        protected void onStompConnect(final StompFrame command) throws ProtocolException {
480    
481            if (connected.get()) {
482                throw new ProtocolException("Allready connected.");
483            }
484    
485            final Map<String, String> headers = command.getHeaders();
486    
487            // allow anyone to login for now
488            String login = headers.get(Stomp.Headers.Connect.LOGIN);
489            String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
490            String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
491    
492    
493            IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
494    
495            connectionInfo.setConnectionId(connectionId);
496            if (clientId != null) {
497                connectionInfo.setClientId(clientId);
498            } else {
499                connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
500            }
501    
502            connectionInfo.setResponseRequired(true);
503            connectionInfo.setUserName(login);
504            connectionInfo.setPassword(passcode);
505            connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
506    
507            sendToActiveMQ(connectionInfo, new ResponseHandler() {
508                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
509    
510                    if (response.isException()) {
511                        // If the connection attempt fails we close the socket.
512                        Throwable exception = ((ExceptionResponse)response).getException();
513                        handleException(exception, command);
514                        getStompTransport().onException(IOExceptionSupport.create(exception));
515                        return;
516                    }
517    
518                    final SessionInfo sessionInfo = new SessionInfo(sessionId);
519                    sendToActiveMQ(sessionInfo, null);
520    
521                    final ProducerInfo producerInfo = new ProducerInfo(producerId);
522                    sendToActiveMQ(producerInfo, new ResponseHandler() {
523                        public void onResponse(ProtocolConverter converter, Response response) throws IOException {
524    
525                            if (response.isException()) {
526                                // If the connection attempt fails we close the socket.
527                                Throwable exception = ((ExceptionResponse)response).getException();
528                                handleException(exception, command);
529                                getStompTransport().onException(IOExceptionSupport.create(exception));
530                            }
531    
532                            connected.set(true);
533                            HashMap<String, String> responseHeaders = new HashMap<String, String>();
534    
535                            responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
536                            String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
537                            if (requestId == null) {
538                                // TODO legacy
539                                requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
540                            }
541                            if (requestId != null) {
542                                // TODO legacy
543                                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
544                                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
545                            }
546    
547                            StompFrame sc = new StompFrame();
548                            sc.setAction(Stomp.Responses.CONNECTED);
549                            sc.setHeaders(responseHeaders);
550                            sendToStomp(sc);
551                        }
552                    });
553    
554                }
555            });
556        }
557    
558        protected void onStompDisconnect(StompFrame command) throws ProtocolException {
559            checkConnected();
560            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
561            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
562            connected.set(false);
563        }
564    
565        protected void checkConnected() throws ProtocolException {
566            if (!connected.get()) {
567                throw new ProtocolException("Not connected.");
568            }
569        }
570    
571        /**
572         * Dispatch a ActiveMQ command
573         *
574         * @param command
575         * @throws IOException
576         */
577        public void onActiveMQCommand(Command command) throws IOException, JMSException {
578            if (command.isResponse()) {
579    
580                Response response = (Response)command;
581                ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
582                if (rh != null) {
583                    rh.onResponse(this, response);
584                } else {
585                    // Pass down any unexpected errors. Should this close the connection?
586                    if (response.isException()) {
587                        Throwable exception = ((ExceptionResponse)response).getException();
588                        handleException(exception, null);
589                    }
590                }
591            } else if (command.isMessageDispatch()) {
592    
593                MessageDispatch md = (MessageDispatch)command;
594                StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
595                if (sub != null) {
596                    sub.onMessageDispatch(md);
597                }
598            } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
599                // Pass down any unexpected async errors. Should this close the connection?
600                Throwable exception = ((ConnectionError)command).getException();
601                handleException(exception, null);
602            }
603        }
604    
605        public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
606            ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
607            return msg;
608        }
609    
610        public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
611            if (ignoreTransformation == true) {
612                return frameTranslator.convertMessage(this, message);
613            } else {
614                return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
615            }
616        }
617    
618        public StompTransport getStompTransport() {
619            return stompTransport;
620        }
621    
622        public ActiveMQDestination createTempQueue(String name) {
623            ActiveMQDestination rc = tempDestinations.get(name);
624            if( rc == null ) {
625                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
626                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
627                tempDestinations.put(name, rc);
628            }
629            return rc;
630        }
631    
632        public ActiveMQDestination createTempTopic(String name) {
633            ActiveMQDestination rc = tempDestinations.get(name);
634            if( rc == null ) {
635                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
636                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
637                tempDestinations.put(name, rc);
638                tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
639            }
640            return rc;
641        }
642    
643        public String getCreatedTempDestinationName(ActiveMQDestination destination) {
644            return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
645        }
646    }