001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.atomic.AtomicLong;
022    import javax.jms.Destination;
023    import javax.jms.IllegalStateException;
024    import javax.jms.InvalidDestinationException;
025    import javax.jms.JMSException;
026    import javax.jms.Message;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.ProducerAck;
029    import org.apache.activemq.command.ProducerId;
030    import org.apache.activemq.command.ProducerInfo;
031    import org.apache.activemq.management.JMSProducerStatsImpl;
032    import org.apache.activemq.management.StatsCapable;
033    import org.apache.activemq.management.StatsImpl;
034    import org.apache.activemq.usage.MemoryUsage;
035    import org.apache.activemq.util.IntrospectionSupport;
036    
037    /**
038     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
039     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
040     * <CODE>Destination</CODE> object to a message-producer creation method
041     * supplied by a session.
042     * <P>
043     * <CODE>MessageProducer</CODE> is the parent interface for all message
044     * producers.
045     * <P>
046     * A client also has the option of creating a message producer without supplying
047     * a destination. In this case, a destination must be provided with every send
048     * operation. A typical use for this kind of message producer is to send replies
049     * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
050     * <P>
051     * A client can specify a default delivery mode, priority, and time to live for
052     * messages sent by a message producer. It can also specify the delivery mode,
053     * priority, and time to live for an individual message.
054     * <P>
055     * A client can specify a time-to-live value in milliseconds for each message it
056     * sends. This value defines a message expiration time that is the sum of the
057     * message's time-to-live and the GMT when it is sent (for transacted sends,
058     * this is the time the client sends the message, not the time the transaction
059     * is committed).
060     * <P>
061     * A JMS provider should do its best to expire messages accurately; however, the
062     * JMS API does not define the accuracy provided.
063     * 
064     * 
065     * @see javax.jms.TopicPublisher
066     * @see javax.jms.QueueSender
067     * @see javax.jms.Session#createProducer
068     */
069    public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
070    
071        protected ProducerInfo info;
072        protected boolean closed;
073    
074        private final JMSProducerStatsImpl stats;
075        private AtomicLong messageSequence;
076        private final long startTime;
077        private MessageTransformer transformer;
078        private MemoryUsage producerWindow;
079    
080        protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
081            super(session);
082            this.info = new ProducerInfo(producerId);
083            this.info.setWindowSize(session.connection.getProducerWindowSize());
084            if (destination != null && destination.getOptions() != null) {
085                Map<String, String> options = new HashMap<String, String>(destination.getOptions());
086                IntrospectionSupport.setProperties(this.info, options, "producer.");
087            }
088            this.info.setDestination(destination);
089    
090            // Enable producer window flow control if protocol > 3 and the window
091            // size > 0
092            if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
093                producerWindow = new MemoryUsage("Producer Window: " + producerId);
094                producerWindow.setExecutor(session.getConnectionExecutor());
095                producerWindow.setLimit(this.info.getWindowSize());
096                producerWindow.start();
097            }
098    
099            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
100            this.defaultPriority = Message.DEFAULT_PRIORITY;
101            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
102            this.startTime = System.currentTimeMillis();
103            this.messageSequence = new AtomicLong(0);
104            this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
105            this.session.addProducer(this);
106            this.session.asyncSendPacket(info);
107            this.setSendTimeout(sendTimeout);
108            setTransformer(session.getTransformer());
109        }
110    
111        public StatsImpl getStats() {
112            return stats;
113        }
114    
115        public JMSProducerStatsImpl getProducerStats() {
116            return stats;
117        }
118    
119        /**
120         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
121         * 
122         * @return this producer's <CODE>Destination/ <CODE>
123         * @throws JMSException if the JMS provider fails to close the producer due to
124         *                      some internal error.
125         * @since 1.1
126         */
127        public Destination getDestination() throws JMSException {
128            checkClosed();
129            return this.info.getDestination();
130        }
131    
132        /**
133         * Closes the message producer.
134         * <P>
135         * Since a provider may allocate some resources on behalf of a <CODE>
136         * MessageProducer</CODE>
137         * outside the Java virtual machine, clients should close them when they are
138         * not needed. Relying on garbage collection to eventually reclaim these
139         * resources may not be timely enough.
140         * 
141         * @throws JMSException if the JMS provider fails to close the producer due
142         *                 to some internal error.
143         */
144        public void close() throws JMSException {
145            if (!closed) {
146                dispose();
147                this.session.asyncSendPacket(info.createRemoveCommand());
148            }
149        }
150    
151        public void dispose() {
152            if (!closed) {
153                this.session.removeProducer(this);
154                if (producerWindow != null) {
155                    producerWindow.stop();
156                }
157                closed = true;
158            }
159        }
160    
161        /**
162         * Check if the instance of this producer has been closed.
163         * 
164         * @throws IllegalStateException
165         */
166        @Override
167        protected void checkClosed() throws IllegalStateException {
168            if (closed) {
169                throw new IllegalStateException("The producer is closed");
170            }
171        }
172    
173        /**
174         * Sends a message to a destination for an unidentified message producer,
175         * specifying delivery mode, priority and time to live.
176         * <P>
177         * Typically, a message producer is assigned a destination at creation time;
178         * however, the JMS API also supports unidentified message producers, which
179         * require that the destination be supplied every time a message is sent.
180         * 
181         * @param destination the destination to send this message to
182         * @param message the message to send
183         * @param deliveryMode the delivery mode to use
184         * @param priority the priority for this message
185         * @param timeToLive the message's lifetime (in milliseconds)
186         * @throws JMSException if the JMS provider fails to send the message due to
187         *                 some internal error.
188         * @throws UnsupportedOperationException if an invalid destination is
189         *                 specified.
190         * @throws InvalidDestinationException if a client uses this method with an
191         *                 invalid destination.
192         * @see javax.jms.Session#createProducer
193         * @since 1.1
194         */
195        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
196            checkClosed();
197            if (destination == null) {
198                if (info.getDestination() == null) {
199                    throw new UnsupportedOperationException("A destination must be specified.");
200                }
201                throw new InvalidDestinationException("Don't understand null destinations");
202            }
203    
204            ActiveMQDestination dest;
205            if (destination == info.getDestination()) {
206                dest = (ActiveMQDestination)destination;
207            } else if (info.getDestination() == null) {
208                dest = ActiveMQDestination.transform(destination);
209            } else {
210                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
211            }
212            if (dest == null) {
213                throw new JMSException("No destination specified");
214            }
215    
216            if (transformer != null) {
217                Message transformedMessage = transformer.producerTransform(session, this, message);
218                if (transformedMessage != null) {
219                    message = transformedMessage;
220                }
221            }
222    
223            if (producerWindow != null) {
224                try {
225                    producerWindow.waitForSpace();
226                } catch (InterruptedException e) {
227                    throw new JMSException("Send aborted due to thread interrupt.");
228                }
229            }
230    
231            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
232    
233            stats.onMessage();
234        }
235    
236        public MessageTransformer getTransformer() {
237            return transformer;
238        }
239    
240        /**
241         * Sets the transformer used to transform messages before they are sent on
242         * to the JMS bus
243         */
244        public void setTransformer(MessageTransformer transformer) {
245            this.transformer = transformer;
246        }
247    
248        /**
249         * @return the time in milli second when this object was created.
250         */
251        protected long getStartTime() {
252            return this.startTime;
253        }
254    
255        /**
256         * @return Returns the messageSequence.
257         */
258        protected long getMessageSequence() {
259            return messageSequence.incrementAndGet();
260        }
261    
262        /**
263         * @param messageSequence The messageSequence to set.
264         */
265        protected void setMessageSequence(AtomicLong messageSequence) {
266            this.messageSequence = messageSequence;
267        }
268    
269        /**
270         * @return Returns the info.
271         */
272        protected ProducerInfo getProducerInfo() {
273            return this.info != null ? this.info : null;
274        }
275    
276        /**
277         * @param info The info to set
278         */
279        protected void setProducerInfo(ProducerInfo info) {
280            this.info = info;
281        }
282    
283        @Override
284        public String toString() {
285            return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
286        }
287    
288        public void onProducerAck(ProducerAck pa) {
289            if (this.producerWindow != null) {
290                this.producerWindow.decreaseUsage(pa.getSize());
291            }
292        }
293    
294    }