001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.Map;
022    
023    import javax.jms.Destination;
024    import javax.jms.JMSException;
025    
026    import com.thoughtworks.xstream.XStream;
027    import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
028    import org.apache.activemq.advisory.AdvisorySupport;
029    import org.apache.activemq.command.*;
030    
031    /**
032     * Implements ActiveMQ 4.0 translations
033     */
034    public class LegacyFrameTranslator implements FrameTranslator {
035    
036    
037        public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
038            final Map headers = command.getHeaders();
039            final ActiveMQMessage msg;
040            /*
041             * To reduce the complexity of this method perhaps a Chain of Responsibility
042             * would be a better implementation
043             */
044            if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
045                String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
046                if(intendedType.equalsIgnoreCase("text")){
047                    ActiveMQTextMessage text = new ActiveMQTextMessage();
048                    try {
049                        text.setText(new String(command.getContent(), "UTF-8"));
050                    } catch (Throwable e) {
051                        throw new ProtocolException("Text could not bet set: " + e, false, e);
052                    }
053                    msg = text;
054                } else if(intendedType.equalsIgnoreCase("bytes")) {
055                    ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();
056                    byteMessage.writeBytes(command.getContent());
057                    msg = byteMessage;
058                } else {
059                    throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
060                }
061            }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
062                headers.remove(Stomp.Headers.CONTENT_LENGTH);
063                ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
064                bm.writeBytes(command.getContent());
065                msg = bm;
066            } else {
067                ActiveMQTextMessage text = new ActiveMQTextMessage();
068                try {
069                    text.setText(new String(command.getContent(), "UTF-8"));
070                } catch (Throwable e) {
071                    throw new ProtocolException("Text could not bet set: " + e, false, e);
072                }
073                msg = text;
074            }
075            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
076            return msg;
077        }
078    
079        public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
080            StompFrame command = new StompFrame();
081            command.setAction(Stomp.Responses.MESSAGE);
082            Map<String, String> headers = new HashMap<String, String>(25);
083            command.setHeaders(headers);
084    
085            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
086    
087            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
088    
089                ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
090                command.setContent(msg.getText().getBytes("UTF-8"));
091    
092            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
093    
094                ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
095                msg.setReadOnlyBody(true);
096                byte[] data = new byte[(int)msg.getBodyLength()];
097                msg.readBytes(data);
098    
099                headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
100                command.setContent(data);
101            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
102                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
103    
104                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
105                                            converter, message, command, this);
106    
107                String body = marshallAdvisory(message.getDataStructure());
108                command.setContent(body.getBytes("UTF-8"));
109            }
110            return command;
111        }
112    
113        public String convertDestination(ProtocolConverter converter, Destination d) {
114            if (d == null) {
115                return null;
116            }
117            ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
118            String physicalName = activeMQDestination.getPhysicalName();
119    
120            String rc = converter.getCreatedTempDestinationName(activeMQDestination);
121            if( rc!=null ) {
122                    return rc;
123            }
124    
125            StringBuffer buffer = new StringBuffer();
126            if (activeMQDestination.isQueue()) {
127                if (activeMQDestination.isTemporary()) {
128                    buffer.append("/remote-temp-queue/");
129                } else {
130                    buffer.append("/queue/");
131                }
132            } else {
133                if (activeMQDestination.isTemporary()) {
134                    buffer.append("/remote-temp-topic/");
135                } else {
136                    buffer.append("/topic/");
137                }
138            }
139            buffer.append(physicalName);
140            return buffer.toString();
141        }
142    
143        public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException {
144            if (name == null) {
145                return null;
146            } else if (name.startsWith("/queue/")) {
147                String qName = name.substring("/queue/".length(), name.length());
148                return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
149            } else if (name.startsWith("/topic/")) {
150                String tName = name.substring("/topic/".length(), name.length());
151                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
152            } else if (name.startsWith("/remote-temp-queue/")) {
153                String tName = name.substring("/remote-temp-queue/".length(), name.length());
154                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
155            } else if (name.startsWith("/remote-temp-topic/")) {
156                String tName = name.substring("/remote-temp-topic/".length(), name.length());
157                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
158            } else if (name.startsWith("/temp-queue/")) {
159                return converter.createTempQueue(name);
160            } else if (name.startsWith("/temp-topic/")) {
161                return converter.createTempTopic(name);
162            } else {
163                throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
164                                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
165            }
166        }
167    
168        /**
169         * Return an Advisory message as a JSON formatted string
170         * @param ds
171         * @return
172         */
173        protected String marshallAdvisory(final DataStructure ds) {
174            XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
175            xstream.setMode(XStream.NO_REFERENCES);
176            xstream.aliasPackage("", "org.apache.activemq.command");
177            return xstream.toXML(ds);
178        }
179    }