001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.IOException;
020    import java.io.UnsupportedEncodingException;
021    import java.util.Enumeration;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Vector;
027    import javax.jms.DeliveryMode;
028    import javax.jms.Destination;
029    import javax.jms.JMSException;
030    import javax.jms.MessageFormatException;
031    import javax.jms.MessageNotWriteableException;
032    import org.apache.activemq.ActiveMQConnection;
033    import org.apache.activemq.ScheduledMessage;
034    import org.apache.activemq.broker.scheduler.CronParser;
035    import org.apache.activemq.filter.PropertyExpression;
036    import org.apache.activemq.state.CommandVisitor;
037    import org.apache.activemq.util.Callback;
038    import org.apache.activemq.util.JMSExceptionSupport;
039    import org.apache.activemq.util.TypeConversionSupport;
040    
041    /**
042     * 
043     * @openwire:marshaller code="23"
044     */
045    public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
046        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
047        public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
048        private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
049    
050        protected transient Callback acknowledgeCallback;
051    
052        public byte getDataStructureType() {
053            return DATA_STRUCTURE_TYPE;
054        }
055    
056    
057        @Override
058        public Message copy() {
059            ActiveMQMessage copy = new ActiveMQMessage();
060            copy(copy);
061            return copy;
062        }
063    
064        protected void copy(ActiveMQMessage copy) {
065            super.copy(copy);
066            copy.acknowledgeCallback = acknowledgeCallback;
067        }
068    
069        @Override
070        public int hashCode() {
071            MessageId id = getMessageId();
072            if (id != null) {
073                return id.hashCode();
074            } else {
075                return super.hashCode();
076            }
077        }
078    
079        @Override
080        public boolean equals(Object o) {
081            if (this == o) {
082                return true;
083            }
084            if (o == null || o.getClass() != getClass()) {
085                return false;
086            }
087    
088            ActiveMQMessage msg = (ActiveMQMessage) o;
089            MessageId oMsg = msg.getMessageId();
090            MessageId thisMsg = this.getMessageId();
091            return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
092        }
093    
094        public void acknowledge() throws JMSException {
095            if (acknowledgeCallback != null) {
096                try {
097                    acknowledgeCallback.execute();
098                } catch (JMSException e) {
099                    throw e;
100                } catch (Throwable e) {
101                    throw JMSExceptionSupport.create(e);
102                }
103            }
104        }
105    
106        @Override
107        public void clearBody() throws JMSException {
108            setContent(null);
109            readOnlyBody = false;
110        }
111    
112        public String getJMSMessageID() {
113            MessageId messageId = this.getMessageId();
114            if (messageId == null) {
115                return null;
116            }
117            return messageId.toString();
118        }
119    
120        /**
121         * Seems to be invalid because the parameter doesn't initialize MessageId
122         * instance variables ProducerId and ProducerSequenceId
123         *
124         * @param value
125         * @throws JMSException
126         */
127        public void setJMSMessageID(String value) throws JMSException {
128            if (value != null) {
129                try {
130                    MessageId id = new MessageId(value);
131                    this.setMessageId(id);
132                } catch (NumberFormatException e) {
133                    // we must be some foreign JMS provider or strange user-supplied
134                    // String
135                    // so lets set the IDs to be 1
136                    MessageId id = new MessageId();
137                    id.setTextView(value);
138                    this.setMessageId(messageId);
139                }
140            } else {
141                this.setMessageId(null);
142            }
143        }
144    
145        /**
146         * This will create an object of MessageId. For it to be valid, the instance
147         * variable ProducerId and producerSequenceId must be initialized.
148         *
149         * @param producerId
150         * @param producerSequenceId
151         * @throws JMSException
152         */
153        public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException {
154            MessageId id = null;
155            try {
156                id = new MessageId(producerId, producerSequenceId);
157                this.setMessageId(id);
158            } catch (Throwable e) {
159                throw JMSExceptionSupport.create("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
160            }
161        }
162    
163        public long getJMSTimestamp() {
164            return this.getTimestamp();
165        }
166    
167        public void setJMSTimestamp(long timestamp) {
168            this.setTimestamp(timestamp);
169        }
170    
171        public String getJMSCorrelationID() {
172            return this.getCorrelationId();
173        }
174    
175        public void setJMSCorrelationID(String correlationId) {
176            this.setCorrelationId(correlationId);
177        }
178    
179        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
180            return encodeString(this.getCorrelationId());
181        }
182    
183        public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException {
184            this.setCorrelationId(decodeString(correlationId));
185        }
186    
187        public String getJMSXMimeType() {
188            return "jms/message";
189        }
190    
191        protected static String decodeString(byte[] data) throws JMSException {
192            try {
193                if (data == null) {
194                    return null;
195                }
196                return new String(data, "UTF-8");
197            } catch (UnsupportedEncodingException e) {
198                throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
199            }
200        }
201    
202        protected static byte[] encodeString(String data) throws JMSException {
203            try {
204                if (data == null) {
205                    return null;
206                }
207                return data.getBytes("UTF-8");
208            } catch (UnsupportedEncodingException e) {
209                throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
210            }
211        }
212    
213        public Destination getJMSReplyTo() {
214            return this.getReplyTo();
215        }
216    
217        public void setJMSReplyTo(Destination destination) throws JMSException {
218            this.setReplyTo(ActiveMQDestination.transform(destination));
219        }
220    
221        public Destination getJMSDestination() {
222            return this.getDestination();
223        }
224    
225        public void setJMSDestination(Destination destination) throws JMSException {
226            this.setDestination(ActiveMQDestination.transform(destination));
227        }
228    
229        public int getJMSDeliveryMode() {
230            return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
231        }
232    
233        public void setJMSDeliveryMode(int mode) {
234            this.setPersistent(mode == DeliveryMode.PERSISTENT);
235        }
236    
237        public boolean getJMSRedelivered() {
238            return this.isRedelivered();
239        }
240    
241        public void setJMSRedelivered(boolean redelivered) {
242            this.setRedelivered(redelivered);
243        }
244    
245        public String getJMSType() {
246            return this.getType();
247        }
248    
249        public void setJMSType(String type) {
250            this.setType(type);
251        }
252    
253        public long getJMSExpiration() {
254            return this.getExpiration();
255        }
256    
257        public void setJMSExpiration(long expiration) {
258            this.setExpiration(expiration);
259        }
260    
261        public int getJMSPriority() {
262            return this.getPriority();
263        }
264    
265        public void setJMSPriority(int priority) {
266            this.setPriority((byte) priority);
267        }
268    
269        @Override
270        public void clearProperties() {
271            super.clearProperties();
272            readOnlyProperties = false;
273        }
274    
275        public boolean propertyExists(String name) throws JMSException {
276            try {
277                return (this.getProperties().containsKey(name) || getObjectProperty(name)!= null);
278            } catch (IOException e) {
279                throw JMSExceptionSupport.create(e);
280            }
281        }
282    
283        public Enumeration getPropertyNames() throws JMSException {
284            try {
285                Vector<String> result = new Vector<String>(this.getProperties().keySet());
286                return result.elements();
287            } catch (IOException e) {
288                throw JMSExceptionSupport.create(e);
289            }
290        }
291    
292        /**
293         * return all property names, including standard JMS properties and JMSX properties
294         * @return  Enumeration of all property names on this message
295         * @throws JMSException
296         */
297        public Enumeration getAllPropertyNames() throws JMSException {
298            try {
299                Vector<String> result = new Vector<String>(this.getProperties().keySet());
300                result.addAll(JMS_PROPERTY_SETERS.keySet());
301                return result.elements();
302            } catch (IOException e) {
303                throw JMSExceptionSupport.create(e);
304            }
305        }
306    
307        interface PropertySetter {
308    
309            void set(Message message, Object value) throws MessageFormatException;
310        }
311    
312        static {
313            JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
314                public void set(Message message, Object value) throws MessageFormatException {
315                    Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
316                    if (rc == null) {
317                        throw new MessageFormatException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
318                    }
319                    message.setRedeliveryCounter(rc.intValue() - 1);
320                }
321            });
322            JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
323                public void set(Message message, Object value) throws MessageFormatException {
324                    String rc = (String) TypeConversionSupport.convert(value, String.class);
325                    if (rc == null) {
326                        throw new MessageFormatException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
327                    }
328                    message.setGroupID(rc);
329                }
330            });
331            JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
332                public void set(Message message, Object value) throws MessageFormatException {
333                    Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
334                    if (rc == null) {
335                        throw new MessageFormatException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
336                    }
337                    message.setGroupSequence(rc.intValue());
338                }
339            });
340            JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
341                public void set(Message message, Object value) throws MessageFormatException {
342                    String rc = (String) TypeConversionSupport.convert(value, String.class);
343                    if (rc == null) {
344                        throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
345                    }
346                    ((ActiveMQMessage) message).setJMSCorrelationID(rc);
347                }
348            });
349            JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
350                public void set(Message message, Object value) throws MessageFormatException {
351                    Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
352                    if (rc == null) {
353                        Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
354                        if (bool == null) {
355                            throw new MessageFormatException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
356                        }
357                        else {
358                            rc = bool.booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
359                        }
360                    }
361                    ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
362                }
363            });
364            JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
365                public void set(Message message, Object value) throws MessageFormatException {
366                    Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
367                    if (rc == null) {
368                        throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
369                    }
370                    ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
371                }
372            });
373            JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
374                public void set(Message message, Object value) throws MessageFormatException {
375                    Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
376                    if (rc == null) {
377                        throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
378                    }
379                    ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
380                }
381            });
382            JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
383                public void set(Message message, Object value) throws MessageFormatException {
384                    Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
385                    if (rc == null) {
386                        throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
387                    }
388                    ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
389                }
390            });
391            JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
392                public void set(Message message, Object value) throws MessageFormatException {
393                    ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
394                    if (rc == null) {
395                        throw new MessageFormatException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
396                    }
397                    ((ActiveMQMessage) message).setReplyTo(rc);
398                }
399            });
400            JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
401                public void set(Message message, Object value) throws MessageFormatException {
402                    Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
403                    if (rc == null) {
404                        throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
405                    }
406                    ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
407                }
408            });
409            JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
410                public void set(Message message, Object value) throws MessageFormatException {
411                    String rc = (String) TypeConversionSupport.convert(value, String.class);
412                    if (rc == null) {
413                        throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
414                    }
415                    ((ActiveMQMessage) message).setJMSType(rc);
416                }
417            });
418        }
419    
420        public void setObjectProperty(String name, Object value) throws JMSException {
421            setObjectProperty(name, value, true);
422        }
423    
424        public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException {
425    
426            if (checkReadOnly) {
427                checkReadOnlyProperties();
428            }
429            if (name == null || name.equals("")) {
430                throw new IllegalArgumentException("Property name cannot be empty or null");
431            }
432    
433            checkValidObject(value);
434            value = convertScheduled(name, value);
435            PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
436    
437            if (setter != null && value != null) {
438                setter.set(this, value);
439            } else {
440                try {
441                    this.setProperty(name, value);
442                } catch (IOException e) {
443                    throw JMSExceptionSupport.create(e);
444                }
445            }
446        }
447    
448        public void setProperties(Map properties) throws JMSException {
449            for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
450                Map.Entry entry = (Map.Entry) iter.next();
451    
452                // Lets use the object property method as we may contain standard
453                // extension headers like JMSXGroupID
454                setObjectProperty((String) entry.getKey(), entry.getValue());
455            }
456        }
457    
458        protected void checkValidObject(Object value) throws MessageFormatException {
459    
460            boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
461            valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
462    
463            if (!valid) {
464    
465                ActiveMQConnection conn = getConnection();
466                // conn is null if we are in the broker rather than a JMS client
467                if (conn == null || conn.isNestedMapAndListEnabled()) {
468                    if (!(value instanceof Map || value instanceof List)) {
469                        throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
470                    }
471                } else {
472                    throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
473                }
474            }
475        }
476        
477        protected void  checkValidScheduled(String name, Object value) throws MessageFormatException {
478            if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
479                if (value instanceof Long == false && value instanceof Integer == false) {
480                    throw new MessageFormatException(name + " should be long or int value");
481                }
482            }
483            if (AMQ_SCHEDULED_CRON.equals(name)) {
484                CronParser.validate(value.toString());
485            }
486        }
487        
488        protected Object  convertScheduled(String name, Object value) throws MessageFormatException {
489            Object result = value;
490            if (AMQ_SCHEDULED_DELAY.equals(name)){
491                result = TypeConversionSupport.convert(value, Long.class);
492            }
493            else if (AMQ_SCHEDULED_PERIOD.equals(name)){
494                result = TypeConversionSupport.convert(value, Long.class);
495            }
496            else if (AMQ_SCHEDULED_REPEAT.equals(name)){
497                result = TypeConversionSupport.convert(value, Integer.class);
498            }
499            return result;
500        }
501    
502        public Object getObjectProperty(String name) throws JMSException {
503            if (name == null) {
504                throw new NullPointerException("Property name cannot be null");
505            }
506    
507            // PropertyExpression handles converting message headers to properties.
508            PropertyExpression expression = new PropertyExpression(name);
509            return expression.evaluate(this);
510        }
511    
512        public boolean getBooleanProperty(String name) throws JMSException {
513            Object value = getObjectProperty(name);
514            if (value == null) {
515                return false;
516            }
517            Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
518            if (rc == null) {
519                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
520            }
521            return rc.booleanValue();
522        }
523    
524        public byte getByteProperty(String name) throws JMSException {
525            Object value = getObjectProperty(name);
526            if (value == null) {
527                throw new NumberFormatException("property " + name + " was null");
528            }
529            Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
530            if (rc == null) {
531                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
532            }
533            return rc.byteValue();
534        }
535    
536        public short getShortProperty(String name) throws JMSException {
537            Object value = getObjectProperty(name);
538            if (value == null) {
539                throw new NumberFormatException("property " + name + " was null");
540            }
541            Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
542            if (rc == null) {
543                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
544            }
545            return rc.shortValue();
546        }
547    
548        public int getIntProperty(String name) throws JMSException {
549            Object value = getObjectProperty(name);
550            if (value == null) {
551                throw new NumberFormatException("property " + name + " was null");
552            }
553            Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
554            if (rc == null) {
555                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
556            }
557            return rc.intValue();
558        }
559    
560        public long getLongProperty(String name) throws JMSException {
561            Object value = getObjectProperty(name);
562            if (value == null) {
563                throw new NumberFormatException("property " + name + " was null");
564            }
565            Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
566            if (rc == null) {
567                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
568            }
569            return rc.longValue();
570        }
571    
572        public float getFloatProperty(String name) throws JMSException {
573            Object value = getObjectProperty(name);
574            if (value == null) {
575                throw new NullPointerException("property " + name + " was null");
576            }
577            Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
578            if (rc == null) {
579                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
580            }
581            return rc.floatValue();
582        }
583    
584        public double getDoubleProperty(String name) throws JMSException {
585            Object value = getObjectProperty(name);
586            if (value == null) {
587                throw new NullPointerException("property " + name + " was null");
588            }
589            Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
590            if (rc == null) {
591                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
592            }
593            return rc.doubleValue();
594        }
595    
596        public String getStringProperty(String name) throws JMSException {
597            Object value = null;
598            if (name.equals("JMSXUserID")) {
599                value = getUserID();
600                if (value == null) {
601                    value = getObjectProperty(name);
602                }
603            } else {
604                value = getObjectProperty(name);
605            }
606            if (value == null) {
607                return null;
608            }
609            String rc = (String) TypeConversionSupport.convert(value, String.class);
610            if (rc == null) {
611                throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
612            }
613            return rc;
614        }
615    
616        public void setBooleanProperty(String name, boolean value) throws JMSException {
617            setBooleanProperty(name, value, true);
618        }
619    
620        public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws JMSException {
621            setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
622        }
623    
624        public void setByteProperty(String name, byte value) throws JMSException {
625            setObjectProperty(name, Byte.valueOf(value));
626        }
627    
628        public void setShortProperty(String name, short value) throws JMSException {
629            setObjectProperty(name, Short.valueOf(value));
630        }
631    
632        public void setIntProperty(String name, int value) throws JMSException {
633            setObjectProperty(name, Integer.valueOf(value));
634        }
635    
636        public void setLongProperty(String name, long value) throws JMSException {
637            setObjectProperty(name, Long.valueOf(value));
638        }
639    
640        public void setFloatProperty(String name, float value) throws JMSException {
641            setObjectProperty(name, new Float(value));
642        }
643    
644        public void setDoubleProperty(String name, double value) throws JMSException {
645            setObjectProperty(name, new Double(value));
646        }
647    
648        public void setStringProperty(String name, String value) throws JMSException {
649            setObjectProperty(name, value);
650        }
651    
652        private void checkReadOnlyProperties() throws MessageNotWriteableException {
653            if (readOnlyProperties) {
654                throw new MessageNotWriteableException("Message properties are read-only");
655            }
656        }
657    
658        protected void checkReadOnlyBody() throws MessageNotWriteableException {
659            if (readOnlyBody) {
660                throw new MessageNotWriteableException("Message body is read-only");
661            }
662        }
663    
664        public Callback getAcknowledgeCallback() {
665            return acknowledgeCallback;
666        }
667    
668        public void setAcknowledgeCallback(Callback acknowledgeCallback) {
669            this.acknowledgeCallback = acknowledgeCallback;
670        }
671    
672        /**
673         * Send operation event listener. Used to get the message ready to be sent.
674         */
675        public void onSend() throws JMSException {
676            setReadOnlyBody(true);
677            setReadOnlyProperties(true);
678        }
679    
680        public Response visit(CommandVisitor visitor) throws Exception {
681            return visitor.processMessage(this);
682        }
683    }