001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.IOException; 020 import java.io.OutputStreamWriter; 021 import java.io.PrintWriter; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import javax.jms.JMSException; 029 030 import org.apache.activemq.broker.BrokerContext; 031 import org.apache.activemq.broker.BrokerContextAware; 032 import org.apache.activemq.command.ActiveMQDestination; 033 import org.apache.activemq.command.ActiveMQMessage; 034 import org.apache.activemq.command.ActiveMQTempQueue; 035 import org.apache.activemq.command.ActiveMQTempTopic; 036 import org.apache.activemq.command.Command; 037 import org.apache.activemq.command.ConnectionError; 038 import org.apache.activemq.command.ConnectionId; 039 import org.apache.activemq.command.ConnectionInfo; 040 import org.apache.activemq.command.ConsumerId; 041 import org.apache.activemq.command.ConsumerInfo; 042 import org.apache.activemq.command.DestinationInfo; 043 import org.apache.activemq.command.ExceptionResponse; 044 import org.apache.activemq.command.LocalTransactionId; 045 import org.apache.activemq.command.MessageAck; 046 import org.apache.activemq.command.MessageDispatch; 047 import org.apache.activemq.command.MessageId; 048 import org.apache.activemq.command.ProducerId; 049 import org.apache.activemq.command.ProducerInfo; 050 import org.apache.activemq.command.RemoveSubscriptionInfo; 051 import org.apache.activemq.command.Response; 052 import org.apache.activemq.command.SessionId; 053 import org.apache.activemq.command.SessionInfo; 054 import org.apache.activemq.command.ShutdownInfo; 055 import org.apache.activemq.command.TransactionId; 056 import org.apache.activemq.command.TransactionInfo; 057 import org.apache.activemq.util.ByteArrayOutputStream; 058 import org.apache.activemq.util.FactoryFinder; 059 import org.apache.activemq.util.IOExceptionSupport; 060 import org.apache.activemq.util.IdGenerator; 061 import org.apache.activemq.util.IntrospectionSupport; 062 import org.apache.activemq.util.LongSequenceGenerator; 063 import org.slf4j.Logger; 064 import org.slf4j.LoggerFactory; 065 import org.springframework.context.ApplicationContextAware; 066 067 /** 068 * @author <a href="http://hiramchirino.com">chirino</a> 069 */ 070 public class ProtocolConverter { 071 072 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 073 074 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 075 076 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 077 private final SessionId sessionId = new SessionId(connectionId, -1); 078 private final ProducerId producerId = new ProducerId(sessionId, 1); 079 080 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 081 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 082 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 083 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 084 085 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 086 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 087 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 088 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 089 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 090 private final StompTransport stompTransport; 091 092 private final Object commnadIdMutex = new Object(); 093 private int lastCommandId; 094 private final AtomicBoolean connected = new AtomicBoolean(false); 095 private final FrameTranslator frameTranslator; 096 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 097 private final BrokerContext brokerContext; 098 099 public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) { 100 this.stompTransport = stompTransport; 101 this.frameTranslator = translator; 102 this.brokerContext = brokerContext; 103 } 104 105 protected int generateCommandId() { 106 synchronized (commnadIdMutex) { 107 return lastCommandId++; 108 } 109 } 110 111 protected ResponseHandler createResponseHandler(final StompFrame command) { 112 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 113 if (receiptId != null) { 114 return new ResponseHandler() { 115 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 116 if (response.isException()) { 117 // Generally a command can fail.. but that does not invalidate the connection. 118 // We report back the failure but we don't close the connection. 119 Throwable exception = ((ExceptionResponse)response).getException(); 120 handleException(exception, command); 121 } else { 122 StompFrame sc = new StompFrame(); 123 sc.setAction(Stomp.Responses.RECEIPT); 124 sc.setHeaders(new HashMap<String, String>(1)); 125 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 126 stompTransport.sendToStomp(sc); 127 } 128 } 129 }; 130 } 131 return null; 132 } 133 134 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 135 command.setCommandId(generateCommandId()); 136 if (handler != null) { 137 command.setResponseRequired(true); 138 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 139 } 140 stompTransport.sendToActiveMQ(command); 141 } 142 143 protected void sendToStomp(StompFrame command) throws IOException { 144 stompTransport.sendToStomp(command); 145 } 146 147 protected FrameTranslator findTranslator(String header) { 148 FrameTranslator translator = frameTranslator; 149 try { 150 if (header != null) { 151 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER 152 .newInstance(header); 153 if (translator instanceof BrokerContextAware) { 154 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 155 } 156 } 157 } catch (Exception ignore) { 158 // if anything goes wrong use the default translator 159 } 160 161 return translator; 162 } 163 164 /** 165 * Convert a stomp command 166 * 167 * @param command 168 */ 169 public void onStompCommand(StompFrame command) throws IOException, JMSException { 170 try { 171 172 if (command.getClass() == StompFrameError.class) { 173 throw ((StompFrameError)command).getException(); 174 } 175 176 String action = command.getAction(); 177 if (action.startsWith(Stomp.Commands.SEND)) { 178 onStompSend(command); 179 } else if (action.startsWith(Stomp.Commands.ACK)) { 180 onStompAck(command); 181 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 182 onStompBegin(command); 183 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 184 onStompCommit(command); 185 } else if (action.startsWith(Stomp.Commands.ABORT)) { 186 onStompAbort(command); 187 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 188 onStompSubscribe(command); 189 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 190 onStompUnsubscribe(command); 191 } else if (action.startsWith(Stomp.Commands.CONNECT)) { 192 onStompConnect(command); 193 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 194 onStompDisconnect(command); 195 } else { 196 throw new ProtocolException("Unknown STOMP action: " + action); 197 } 198 199 } catch (ProtocolException e) { 200 handleException(e, command); 201 // Some protocol errors can cause the connection to get closed. 202 if( e.isFatal() ) { 203 getStompTransport().onException(e); 204 } 205 } 206 } 207 208 protected void handleException(Throwable exception, StompFrame command) throws IOException { 209 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 210 if (LOG.isDebugEnabled()) { 211 LOG.debug("Exception detail", exception); 212 } 213 214 // Let the stomp client know about any protocol errors. 215 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 216 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 217 exception.printStackTrace(stream); 218 stream.close(); 219 220 HashMap<String, String> headers = new HashMap<String, String>(); 221 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 222 223 if (command != null) { 224 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 225 if (receiptId != null) { 226 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 227 } 228 } 229 230 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 231 sendToStomp(errorMessage); 232 } 233 234 protected void onStompSend(StompFrame command) throws IOException, JMSException { 235 checkConnected(); 236 237 Map<String, String> headers = command.getHeaders(); 238 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 239 headers.remove("transaction"); 240 241 ActiveMQMessage message = convertMessage(command); 242 243 message.setProducerId(producerId); 244 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 245 message.setMessageId(id); 246 message.setJMSTimestamp(System.currentTimeMillis()); 247 248 if (stompTx != null) { 249 TransactionId activemqTx = transactions.get(stompTx); 250 if (activemqTx == null) { 251 throw new ProtocolException("Invalid transaction id: " + stompTx); 252 } 253 message.setTransactionId(activemqTx); 254 } 255 256 message.onSend(); 257 sendToActiveMQ(message, createResponseHandler(command)); 258 259 } 260 261 protected void onStompAck(StompFrame command) throws ProtocolException { 262 checkConnected(); 263 264 // TODO: acking with just a message id is very bogus 265 // since the same message id could have been sent to 2 different 266 // subscriptions 267 // on the same stomp connection. For example, when 2 subs are created on 268 // the same topic. 269 270 Map<String, String> headers = command.getHeaders(); 271 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 272 if (messageId == null) { 273 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 274 } 275 276 TransactionId activemqTx = null; 277 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 278 if (stompTx != null) { 279 activemqTx = transactions.get(stompTx); 280 if (activemqTx == null) { 281 throw new ProtocolException("Invalid transaction id: " + stompTx); 282 } 283 } 284 285 boolean acked = false; 286 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 287 StompSubscription sub = iter.next(); 288 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 289 if (ack != null) { 290 ack.setTransactionId(activemqTx); 291 sendToActiveMQ(ack, createResponseHandler(command)); 292 acked = true; 293 break; 294 } 295 } 296 297 if (!acked) { 298 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 299 } 300 301 } 302 303 protected void onStompBegin(StompFrame command) throws ProtocolException { 304 checkConnected(); 305 306 Map<String, String> headers = command.getHeaders(); 307 308 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 309 310 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 311 throw new ProtocolException("Must specify the transaction you are beginning"); 312 } 313 314 if (transactions.get(stompTx) != null) { 315 throw new ProtocolException("The transaction was allready started: " + stompTx); 316 } 317 318 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 319 transactions.put(stompTx, activemqTx); 320 321 TransactionInfo tx = new TransactionInfo(); 322 tx.setConnectionId(connectionId); 323 tx.setTransactionId(activemqTx); 324 tx.setType(TransactionInfo.BEGIN); 325 326 sendToActiveMQ(tx, createResponseHandler(command)); 327 328 } 329 330 protected void onStompCommit(StompFrame command) throws ProtocolException { 331 checkConnected(); 332 333 Map<String, String> headers = command.getHeaders(); 334 335 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 336 if (stompTx == null) { 337 throw new ProtocolException("Must specify the transaction you are committing"); 338 } 339 340 TransactionId activemqTx = transactions.remove(stompTx); 341 if (activemqTx == null) { 342 throw new ProtocolException("Invalid transaction id: " + stompTx); 343 } 344 345 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 346 StompSubscription sub = iter.next(); 347 sub.onStompCommit(activemqTx); 348 } 349 350 TransactionInfo tx = new TransactionInfo(); 351 tx.setConnectionId(connectionId); 352 tx.setTransactionId(activemqTx); 353 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 354 355 sendToActiveMQ(tx, createResponseHandler(command)); 356 357 } 358 359 protected void onStompAbort(StompFrame command) throws ProtocolException { 360 checkConnected(); 361 Map<String, String> headers = command.getHeaders(); 362 363 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 364 if (stompTx == null) { 365 throw new ProtocolException("Must specify the transaction you are committing"); 366 } 367 368 TransactionId activemqTx = transactions.remove(stompTx); 369 if (activemqTx == null) { 370 throw new ProtocolException("Invalid transaction id: " + stompTx); 371 } 372 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 373 StompSubscription sub = iter.next(); 374 try { 375 sub.onStompAbort(activemqTx); 376 } catch (Exception e) { 377 throw new ProtocolException("Transaction abort failed", false, e); 378 } 379 } 380 381 TransactionInfo tx = new TransactionInfo(); 382 tx.setConnectionId(connectionId); 383 tx.setTransactionId(activemqTx); 384 tx.setType(TransactionInfo.ROLLBACK); 385 386 sendToActiveMQ(tx, createResponseHandler(command)); 387 388 } 389 390 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 391 checkConnected(); 392 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 393 Map<String, String> headers = command.getHeaders(); 394 395 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 396 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 397 398 ActiveMQDestination actualDest = translator.convertDestination(this, destination); 399 400 if (actualDest == null) { 401 throw new ProtocolException("Invalid Destination."); 402 } 403 404 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 405 ConsumerInfo consumerInfo = new ConsumerInfo(id); 406 consumerInfo.setPrefetchSize(1000); 407 consumerInfo.setDispatchAsync(true); 408 409 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 410 consumerInfo.setSelector(selector); 411 412 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 413 414 consumerInfo.setDestination(translator.convertDestination(this, destination)); 415 416 StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 417 stompSubscription.setDestination(actualDest); 418 419 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 420 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 421 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 422 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 423 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 424 } else { 425 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 426 } 427 428 subscriptionsByConsumerId.put(id, stompSubscription); 429 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 430 431 } 432 433 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 434 checkConnected(); 435 Map<String, String> headers = command.getHeaders(); 436 437 ActiveMQDestination destination = null; 438 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 439 if (o != null) { 440 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o); 441 } 442 443 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 444 445 if (subscriptionId == null && destination == null) { 446 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 447 } 448 449 // check if it is a durable subscription 450 String durable = command.getHeaders().get("activemq.subscriptionName"); 451 if (durable != null) { 452 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 453 info.setClientId(durable); 454 info.setSubscriptionName(durable); 455 info.setConnectionId(connectionId); 456 sendToActiveMQ(info, createResponseHandler(command)); 457 return; 458 } 459 460 // TODO: Unsubscribing using a destination is a bit wierd if multiple 461 // subscriptions 462 // are created with the same destination. Perhaps this should be 463 // removed. 464 // 465 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 466 StompSubscription sub = iter.next(); 467 if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) { 468 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 469 iter.remove(); 470 return; 471 } 472 } 473 474 throw new ProtocolException("No subscription matched."); 475 } 476 477 ConnectionInfo connectionInfo = new ConnectionInfo(); 478 479 protected void onStompConnect(final StompFrame command) throws ProtocolException { 480 481 if (connected.get()) { 482 throw new ProtocolException("Allready connected."); 483 } 484 485 final Map<String, String> headers = command.getHeaders(); 486 487 // allow anyone to login for now 488 String login = headers.get(Stomp.Headers.Connect.LOGIN); 489 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 490 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 491 492 493 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 494 495 connectionInfo.setConnectionId(connectionId); 496 if (clientId != null) { 497 connectionInfo.setClientId(clientId); 498 } else { 499 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 500 } 501 502 connectionInfo.setResponseRequired(true); 503 connectionInfo.setUserName(login); 504 connectionInfo.setPassword(passcode); 505 connectionInfo.setTransportContext(stompTransport.getPeerCertificates()); 506 507 sendToActiveMQ(connectionInfo, new ResponseHandler() { 508 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 509 510 if (response.isException()) { 511 // If the connection attempt fails we close the socket. 512 Throwable exception = ((ExceptionResponse)response).getException(); 513 handleException(exception, command); 514 getStompTransport().onException(IOExceptionSupport.create(exception)); 515 return; 516 } 517 518 final SessionInfo sessionInfo = new SessionInfo(sessionId); 519 sendToActiveMQ(sessionInfo, null); 520 521 final ProducerInfo producerInfo = new ProducerInfo(producerId); 522 sendToActiveMQ(producerInfo, new ResponseHandler() { 523 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 524 525 if (response.isException()) { 526 // If the connection attempt fails we close the socket. 527 Throwable exception = ((ExceptionResponse)response).getException(); 528 handleException(exception, command); 529 getStompTransport().onException(IOExceptionSupport.create(exception)); 530 } 531 532 connected.set(true); 533 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 534 535 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 536 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 537 if (requestId == null) { 538 // TODO legacy 539 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 540 } 541 if (requestId != null) { 542 // TODO legacy 543 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 544 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 545 } 546 547 StompFrame sc = new StompFrame(); 548 sc.setAction(Stomp.Responses.CONNECTED); 549 sc.setHeaders(responseHeaders); 550 sendToStomp(sc); 551 } 552 }); 553 554 } 555 }); 556 } 557 558 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 559 checkConnected(); 560 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 561 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 562 connected.set(false); 563 } 564 565 protected void checkConnected() throws ProtocolException { 566 if (!connected.get()) { 567 throw new ProtocolException("Not connected."); 568 } 569 } 570 571 /** 572 * Dispatch a ActiveMQ command 573 * 574 * @param command 575 * @throws IOException 576 */ 577 public void onActiveMQCommand(Command command) throws IOException, JMSException { 578 if (command.isResponse()) { 579 580 Response response = (Response)command; 581 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 582 if (rh != null) { 583 rh.onResponse(this, response); 584 } else { 585 // Pass down any unexpected errors. Should this close the connection? 586 if (response.isException()) { 587 Throwable exception = ((ExceptionResponse)response).getException(); 588 handleException(exception, null); 589 } 590 } 591 } else if (command.isMessageDispatch()) { 592 593 MessageDispatch md = (MessageDispatch)command; 594 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 595 if (sub != null) { 596 sub.onMessageDispatch(md); 597 } 598 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 599 // Pass down any unexpected async errors. Should this close the connection? 600 Throwable exception = ((ConnectionError)command).getException(); 601 handleException(exception, null); 602 } 603 } 604 605 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 606 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 607 return msg; 608 } 609 610 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 611 if (ignoreTransformation == true) { 612 return frameTranslator.convertMessage(this, message); 613 } else { 614 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); 615 } 616 } 617 618 public StompTransport getStompTransport() { 619 return stompTransport; 620 } 621 622 public ActiveMQDestination createTempQueue(String name) { 623 ActiveMQDestination rc = tempDestinations.get(name); 624 if( rc == null ) { 625 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 626 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 627 tempDestinations.put(name, rc); 628 } 629 return rc; 630 } 631 632 public ActiveMQDestination createTempTopic(String name) { 633 ActiveMQDestination rc = tempDestinations.get(name); 634 if( rc == null ) { 635 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 636 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 637 tempDestinations.put(name, rc); 638 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 639 } 640 return rc; 641 } 642 643 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 644 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 645 } 646 }