001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.atomic.AtomicLong; 022 import javax.jms.Destination; 023 import javax.jms.IllegalStateException; 024 import javax.jms.InvalidDestinationException; 025 import javax.jms.JMSException; 026 import javax.jms.Message; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.ProducerAck; 029 import org.apache.activemq.command.ProducerId; 030 import org.apache.activemq.command.ProducerInfo; 031 import org.apache.activemq.management.JMSProducerStatsImpl; 032 import org.apache.activemq.management.StatsCapable; 033 import org.apache.activemq.management.StatsImpl; 034 import org.apache.activemq.usage.MemoryUsage; 035 import org.apache.activemq.util.IntrospectionSupport; 036 037 /** 038 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a 039 * destination. A <CODE>MessageProducer</CODE> object is created by passing a 040 * <CODE>Destination</CODE> object to a message-producer creation method 041 * supplied by a session. 042 * <P> 043 * <CODE>MessageProducer</CODE> is the parent interface for all message 044 * producers. 045 * <P> 046 * A client also has the option of creating a message producer without supplying 047 * a destination. In this case, a destination must be provided with every send 048 * operation. A typical use for this kind of message producer is to send replies 049 * to requests using the request's <CODE>JMSReplyTo</CODE> destination. 050 * <P> 051 * A client can specify a default delivery mode, priority, and time to live for 052 * messages sent by a message producer. It can also specify the delivery mode, 053 * priority, and time to live for an individual message. 054 * <P> 055 * A client can specify a time-to-live value in milliseconds for each message it 056 * sends. This value defines a message expiration time that is the sum of the 057 * message's time-to-live and the GMT when it is sent (for transacted sends, 058 * this is the time the client sends the message, not the time the transaction 059 * is committed). 060 * <P> 061 * A JMS provider should do its best to expire messages accurately; however, the 062 * JMS API does not define the accuracy provided. 063 * 064 * 065 * @see javax.jms.TopicPublisher 066 * @see javax.jms.QueueSender 067 * @see javax.jms.Session#createProducer 068 */ 069 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable { 070 071 protected ProducerInfo info; 072 protected boolean closed; 073 074 private final JMSProducerStatsImpl stats; 075 private AtomicLong messageSequence; 076 private final long startTime; 077 private MessageTransformer transformer; 078 private MemoryUsage producerWindow; 079 080 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { 081 super(session); 082 this.info = new ProducerInfo(producerId); 083 this.info.setWindowSize(session.connection.getProducerWindowSize()); 084 if (destination != null && destination.getOptions() != null) { 085 Map<String, String> options = new HashMap<String, String>(destination.getOptions()); 086 IntrospectionSupport.setProperties(this.info, options, "producer."); 087 } 088 this.info.setDestination(destination); 089 090 // Enable producer window flow control if protocol > 3 and the window 091 // size > 0 092 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { 093 producerWindow = new MemoryUsage("Producer Window: " + producerId); 094 producerWindow.setExecutor(session.getConnectionExecutor()); 095 producerWindow.setLimit(this.info.getWindowSize()); 096 producerWindow.start(); 097 } 098 099 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; 100 this.defaultPriority = Message.DEFAULT_PRIORITY; 101 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; 102 this.startTime = System.currentTimeMillis(); 103 this.messageSequence = new AtomicLong(0); 104 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); 105 this.session.addProducer(this); 106 this.session.asyncSendPacket(info); 107 this.setSendTimeout(sendTimeout); 108 setTransformer(session.getTransformer()); 109 } 110 111 public StatsImpl getStats() { 112 return stats; 113 } 114 115 public JMSProducerStatsImpl getProducerStats() { 116 return stats; 117 } 118 119 /** 120 * Gets the destination associated with this <CODE>MessageProducer</CODE>. 121 * 122 * @return this producer's <CODE>Destination/ <CODE> 123 * @throws JMSException if the JMS provider fails to close the producer due to 124 * some internal error. 125 * @since 1.1 126 */ 127 public Destination getDestination() throws JMSException { 128 checkClosed(); 129 return this.info.getDestination(); 130 } 131 132 /** 133 * Closes the message producer. 134 * <P> 135 * Since a provider may allocate some resources on behalf of a <CODE> 136 * MessageProducer</CODE> 137 * outside the Java virtual machine, clients should close them when they are 138 * not needed. Relying on garbage collection to eventually reclaim these 139 * resources may not be timely enough. 140 * 141 * @throws JMSException if the JMS provider fails to close the producer due 142 * to some internal error. 143 */ 144 public void close() throws JMSException { 145 if (!closed) { 146 dispose(); 147 this.session.asyncSendPacket(info.createRemoveCommand()); 148 } 149 } 150 151 public void dispose() { 152 if (!closed) { 153 this.session.removeProducer(this); 154 if (producerWindow != null) { 155 producerWindow.stop(); 156 } 157 closed = true; 158 } 159 } 160 161 /** 162 * Check if the instance of this producer has been closed. 163 * 164 * @throws IllegalStateException 165 */ 166 @Override 167 protected void checkClosed() throws IllegalStateException { 168 if (closed) { 169 throw new IllegalStateException("The producer is closed"); 170 } 171 } 172 173 /** 174 * Sends a message to a destination for an unidentified message producer, 175 * specifying delivery mode, priority and time to live. 176 * <P> 177 * Typically, a message producer is assigned a destination at creation time; 178 * however, the JMS API also supports unidentified message producers, which 179 * require that the destination be supplied every time a message is sent. 180 * 181 * @param destination the destination to send this message to 182 * @param message the message to send 183 * @param deliveryMode the delivery mode to use 184 * @param priority the priority for this message 185 * @param timeToLive the message's lifetime (in milliseconds) 186 * @throws JMSException if the JMS provider fails to send the message due to 187 * some internal error. 188 * @throws UnsupportedOperationException if an invalid destination is 189 * specified. 190 * @throws InvalidDestinationException if a client uses this method with an 191 * invalid destination. 192 * @see javax.jms.Session#createProducer 193 * @since 1.1 194 */ 195 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { 196 checkClosed(); 197 if (destination == null) { 198 if (info.getDestination() == null) { 199 throw new UnsupportedOperationException("A destination must be specified."); 200 } 201 throw new InvalidDestinationException("Don't understand null destinations"); 202 } 203 204 ActiveMQDestination dest; 205 if (destination == info.getDestination()) { 206 dest = (ActiveMQDestination)destination; 207 } else if (info.getDestination() == null) { 208 dest = ActiveMQDestination.transform(destination); 209 } else { 210 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); 211 } 212 if (dest == null) { 213 throw new JMSException("No destination specified"); 214 } 215 216 if (transformer != null) { 217 Message transformedMessage = transformer.producerTransform(session, this, message); 218 if (transformedMessage != null) { 219 message = transformedMessage; 220 } 221 } 222 223 if (producerWindow != null) { 224 try { 225 producerWindow.waitForSpace(); 226 } catch (InterruptedException e) { 227 throw new JMSException("Send aborted due to thread interrupt."); 228 } 229 } 230 231 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout); 232 233 stats.onMessage(); 234 } 235 236 public MessageTransformer getTransformer() { 237 return transformer; 238 } 239 240 /** 241 * Sets the transformer used to transform messages before they are sent on 242 * to the JMS bus 243 */ 244 public void setTransformer(MessageTransformer transformer) { 245 this.transformer = transformer; 246 } 247 248 /** 249 * @return the time in milli second when this object was created. 250 */ 251 protected long getStartTime() { 252 return this.startTime; 253 } 254 255 /** 256 * @return Returns the messageSequence. 257 */ 258 protected long getMessageSequence() { 259 return messageSequence.incrementAndGet(); 260 } 261 262 /** 263 * @param messageSequence The messageSequence to set. 264 */ 265 protected void setMessageSequence(AtomicLong messageSequence) { 266 this.messageSequence = messageSequence; 267 } 268 269 /** 270 * @return Returns the info. 271 */ 272 protected ProducerInfo getProducerInfo() { 273 return this.info != null ? this.info : null; 274 } 275 276 /** 277 * @param info The info to set 278 */ 279 protected void setProducerInfo(ProducerInfo info) { 280 this.info = info; 281 } 282 283 @Override 284 public String toString() { 285 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; 286 } 287 288 public void onProducerAck(ProducerAck pa) { 289 if (this.producerWindow != null) { 290 this.producerWindow.decreaseUsage(pa.getSize()); 291 } 292 } 293 294 }