001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.HashMap;
022    import java.util.Map;
023    import java.util.Properties;
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.ThreadFactory;
027    import javax.jms.Connection;
028    import javax.jms.ConnectionFactory;
029    import javax.jms.ExceptionListener;
030    import javax.jms.JMSException;
031    import javax.jms.QueueConnection;
032    import javax.jms.QueueConnectionFactory;
033    import javax.jms.TopicConnection;
034    import javax.jms.TopicConnectionFactory;
035    import javax.naming.Context;
036    import org.apache.activemq.blob.BlobTransferPolicy;
037    import org.apache.activemq.jndi.JNDIBaseStorable;
038    import org.apache.activemq.management.JMSStatsImpl;
039    import org.apache.activemq.management.StatsCapable;
040    import org.apache.activemq.management.StatsImpl;
041    import org.apache.activemq.transport.Transport;
042    import org.apache.activemq.transport.TransportFactory;
043    import org.apache.activemq.transport.TransportListener;
044    import org.apache.activemq.util.IdGenerator;
045    import org.apache.activemq.util.IntrospectionSupport;
046    import org.apache.activemq.util.JMSExceptionSupport;
047    import org.apache.activemq.util.URISupport;
048    import org.apache.activemq.util.URISupport.CompositeData;
049    
050    /**
051     * A ConnectionFactory is an an Administered object, and is used for creating
052     * Connections. <p/> This class also implements QueueConnectionFactory and
053     * TopicConnectionFactory. You can use this connection to create both
054     * QueueConnections and TopicConnections.
055     * 
056     * 
057     * @see javax.jms.ConnectionFactory
058     */
059    public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
060    
061        public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
062        public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
063        public static final String DEFAULT_USER = null;
064        public static final String DEFAULT_PASSWORD = null;
065        public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
066    
067        protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
068            public Thread newThread(Runnable run) {
069                Thread thread = new Thread(run);
070                thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
071                return thread;
072            }
073        });
074    
075        protected URI brokerURL;
076        protected String userName;
077        protected String password;
078        protected String clientID;
079        protected boolean dispatchAsync=true;
080        protected boolean alwaysSessionAsync=true;
081    
082        JMSStatsImpl factoryStats = new JMSStatsImpl();
083    
084        private IdGenerator clientIdGenerator;
085        private String clientIDPrefix;
086    
087        // client policies
088        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
089        private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
090        private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
091        private MessageTransformer transformer;
092    
093        private boolean disableTimeStampsByDefault;
094        private boolean optimizedMessageDispatch = true;
095        private boolean copyMessageOnSend = true;
096        private boolean useCompression;
097        private boolean objectMessageSerializationDefered;
098        private boolean useAsyncSend;
099        private boolean optimizeAcknowledge;
100        private int closeTimeout = 15000;
101        private boolean useRetroactiveConsumer;
102        private boolean exclusiveConsumer;
103        private boolean nestedMapAndListEnabled = true;
104        private boolean alwaysSyncSend;
105        private boolean watchTopicAdvisories = true;
106        private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
107        private long warnAboutUnstartedConnectionTimeout = 500L;
108        private int sendTimeout = 0;
109        private boolean sendAcksAsync=true;
110        private TransportListener transportListener;
111        private ExceptionListener exceptionListener;
112        private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
113        private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
114        private boolean useDedicatedTaskRunner;
115        private long consumerFailoverRedeliveryWaitPeriod = 0;
116        private boolean checkForDuplicates = true;
117        private ClientInternalExceptionListener clientInternalExceptionListener;
118        private boolean messagePrioritySupported = true;
119    
120        // /////////////////////////////////////////////
121        //
122        // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
123        //
124        // /////////////////////////////////////////////
125    
126        public ActiveMQConnectionFactory() {
127            this(DEFAULT_BROKER_URL);
128        }
129    
130        public ActiveMQConnectionFactory(String brokerURL) {
131            this(createURI(brokerURL));
132        }
133    
134        public ActiveMQConnectionFactory(URI brokerURL) {
135            setBrokerURL(brokerURL.toString());
136        }
137    
138        public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
139            setUserName(userName);
140            setPassword(password);
141            setBrokerURL(brokerURL.toString());
142        }
143    
144        public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
145            setUserName(userName);
146            setPassword(password);
147            setBrokerURL(brokerURL);
148        }
149    
150        /**
151         * Returns a copy of the given connection factory
152         */
153        public ActiveMQConnectionFactory copy() {
154            try {
155                return (ActiveMQConnectionFactory)super.clone();
156            } catch (CloneNotSupportedException e) {
157                throw new RuntimeException("This should never happen: " + e, e);
158            }
159        }
160    
161        /**
162         * @param brokerURL
163         * @return
164         * @throws URISyntaxException
165         */
166        private static URI createURI(String brokerURL) {
167            try {
168                return new URI(brokerURL);
169            } catch (URISyntaxException e) {
170                throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
171            }
172        }
173    
174        /**
175         * @return Returns the Connection.
176         */
177        public Connection createConnection() throws JMSException {
178            return createActiveMQConnection();
179        }
180    
181        /**
182         * @return Returns the Connection.
183         */
184        public Connection createConnection(String userName, String password) throws JMSException {
185            return createActiveMQConnection(userName, password);
186        }
187    
188        /**
189         * @return Returns the QueueConnection.
190         * @throws JMSException
191         */
192        public QueueConnection createQueueConnection() throws JMSException {
193            return createActiveMQConnection();
194        }
195    
196        /**
197         * @return Returns the QueueConnection.
198         */
199        public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
200            return createActiveMQConnection(userName, password);
201        }
202    
203        /**
204         * @return Returns the TopicConnection.
205         * @throws JMSException
206         */
207        public TopicConnection createTopicConnection() throws JMSException {
208            return createActiveMQConnection();
209        }
210    
211        /**
212         * @return Returns the TopicConnection.
213         */
214        public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
215            return createActiveMQConnection(userName, password);
216        }
217    
218        public StatsImpl getStats() {
219            // TODO
220            return null;
221        }
222    
223        // /////////////////////////////////////////////
224        //
225        // Implementation methods.
226        //
227        // /////////////////////////////////////////////
228    
229        protected ActiveMQConnection createActiveMQConnection() throws JMSException {
230            return createActiveMQConnection(userName, password);
231        }
232    
233        /**
234         * Creates a Transport based on this object's connection settings. Separated
235         * from createActiveMQConnection to allow for subclasses to override.
236         * 
237         * @return The newly created Transport.
238         * @throws JMSException If unable to create trasnport.
239         * @author sepandm@gmail.com
240         */
241        protected Transport createTransport() throws JMSException {
242            try {
243                return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
244            } catch (Exception e) {
245                throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
246            }
247        }
248    
249        /**
250         * @return Returns the Connection.
251         */
252        protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
253            if (brokerURL == null) {
254                throw new ConfigurationException("brokerURL not set.");
255            }
256            ActiveMQConnection connection = null;
257            try {
258                Transport transport = createTransport();
259                connection = createActiveMQConnection(transport, factoryStats);
260    
261                connection.setUserName(userName);
262                connection.setPassword(password);
263    
264                configureConnection(connection);
265    
266                transport.start();
267    
268                if (clientID != null) {
269                    connection.setDefaultClientID(clientID);
270                }
271    
272                return connection;
273            } catch (JMSException e) {
274                // Clean up!
275                try {
276                    connection.close();
277                } catch (Throwable ignore) {
278                }
279                throw e;
280            } catch (Exception e) {
281                // Clean up!
282                try {
283                    connection.close();
284                } catch (Throwable ignore) {
285                }
286                throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
287            }
288        }
289    
290        protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
291            ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
292            return connection;
293        }
294    
295        protected void configureConnection(ActiveMQConnection connection) throws JMSException {
296            connection.setPrefetchPolicy(getPrefetchPolicy());
297            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
298            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
299            connection.setCopyMessageOnSend(isCopyMessageOnSend());
300            connection.setUseCompression(isUseCompression());
301            connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
302            connection.setDispatchAsync(isDispatchAsync());
303            connection.setUseAsyncSend(isUseAsyncSend());
304            connection.setAlwaysSyncSend(isAlwaysSyncSend());
305            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
306            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
307            connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
308            connection.setExclusiveConsumer(isExclusiveConsumer());
309            connection.setRedeliveryPolicy(getRedeliveryPolicy());
310            connection.setTransformer(getTransformer());
311            connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
312            connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
313            connection.setProducerWindowSize(getProducerWindowSize());
314            connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
315            connection.setSendTimeout(getSendTimeout());
316            connection.setCloseTimeout(getCloseTimeout());
317            connection.setSendAcksAsync(isSendAcksAsync());
318            connection.setAuditDepth(getAuditDepth());
319            connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
320            connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
321            connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
322            connection.setCheckForDuplicates(isCheckForDuplicates());
323            connection.setMessagePrioritySupported(isMessagePrioritySupported());
324            if (transportListener != null) {
325                connection.addTransportListener(transportListener);
326            }
327            if (exceptionListener != null) {
328                    connection.setExceptionListener(exceptionListener);
329            }
330            if (clientInternalExceptionListener != null) {
331                connection.setClientInternalExceptionListener(clientInternalExceptionListener);
332            }
333        }
334    
335        // /////////////////////////////////////////////
336        //
337        // Property Accessors
338        //
339        // /////////////////////////////////////////////
340    
341        public String getBrokerURL() {
342            return brokerURL == null ? null : brokerURL.toString();
343        }
344    
345        /**
346         * Sets the <a
347         * href="http://activemq.apache.org/configuring-transports.html">connection
348         * URL</a> used to connect to the ActiveMQ broker.
349         */
350        public void setBrokerURL(String brokerURL) {
351            this.brokerURL = createURI(brokerURL);
352    
353            // Use all the properties prefixed with 'jms.' to set the connection
354            // factory
355            // options.
356            if (this.brokerURL.getQuery() != null) {
357                // It might be a standard URI or...
358                try {
359    
360                    Map map = URISupport.parseQuery(this.brokerURL.getQuery());
361                    if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) {
362                        this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
363                    }
364    
365                } catch (URISyntaxException e) {
366                }
367    
368            } else {
369    
370                // It might be a composite URI.
371                try {
372                    CompositeData data = URISupport.parseComposite(this.brokerURL);
373                    if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) {
374                        this.brokerURL = data.toURI();
375                    }
376                } catch (URISyntaxException e) {
377                }
378            }
379        }
380    
381        public String getClientID() {
382            return clientID;
383        }
384    
385        /**
386         * Sets the JMS clientID to use for the created connection. Note that this
387         * can only be used by one connection at once so generally its a better idea
388         * to set the clientID on a Connection
389         */
390        public void setClientID(String clientID) {
391            this.clientID = clientID;
392        }
393    
394        public boolean isCopyMessageOnSend() {
395            return copyMessageOnSend;
396        }
397    
398        /**
399         * Should a JMS message be copied to a new JMS Message object as part of the
400         * send() method in JMS. This is enabled by default to be compliant with the
401         * JMS specification. You can disable it if you do not mutate JMS messages
402         * after they are sent for a performance boost
403         */
404        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
405            this.copyMessageOnSend = copyMessageOnSend;
406        }
407    
408        public boolean isDisableTimeStampsByDefault() {
409            return disableTimeStampsByDefault;
410        }
411    
412        /**
413         * Sets whether or not timestamps on messages should be disabled or not. If
414         * you disable them it adds a small performance boost.
415         */
416        public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
417            this.disableTimeStampsByDefault = disableTimeStampsByDefault;
418        }
419    
420        public boolean isOptimizedMessageDispatch() {
421            return optimizedMessageDispatch;
422        }
423    
424        /**
425         * If this flag is set then an larger prefetch limit is used - only
426         * applicable for durable topic subscribers.
427         */
428        public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
429            this.optimizedMessageDispatch = optimizedMessageDispatch;
430        }
431    
432        public String getPassword() {
433            return password;
434        }
435    
436        /**
437         * Sets the JMS password used for connections created from this factory
438         */
439        public void setPassword(String password) {
440            this.password = password;
441        }
442    
443        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
444            return prefetchPolicy;
445        }
446    
447        /**
448         * Sets the <a
449         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
450         * policy</a> for consumers created by this connection.
451         */
452        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
453            this.prefetchPolicy = prefetchPolicy;
454        }
455    
456        public boolean isUseAsyncSend() {
457            return useAsyncSend;
458        }
459    
460        public BlobTransferPolicy getBlobTransferPolicy() {
461            return blobTransferPolicy;
462        }
463    
464        /**
465         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
466         * OBjects) are transferred from producers to brokers to consumers
467         */
468        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
469            this.blobTransferPolicy = blobTransferPolicy;
470        }
471    
472        /**
473         * Forces the use of <a
474         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
475         * adds a massive performance boost; but means that the send() method will
476         * return immediately whether the message has been sent or not which could
477         * lead to message loss.
478         */
479        public void setUseAsyncSend(boolean useAsyncSend) {
480            this.useAsyncSend = useAsyncSend;
481        }
482    
483        public synchronized boolean isWatchTopicAdvisories() {
484            return watchTopicAdvisories;
485        }
486    
487        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
488            this.watchTopicAdvisories = watchTopicAdvisories;
489        }
490    
491        /**
492         * @return true if always sync send messages
493         */
494        public boolean isAlwaysSyncSend() {
495            return this.alwaysSyncSend;
496        }
497    
498        /**
499         * Set true if always require messages to be sync sent
500         * 
501         * @param alwaysSyncSend
502         */
503        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
504            this.alwaysSyncSend = alwaysSyncSend;
505        }
506    
507        public String getUserName() {
508            return userName;
509        }
510    
511        /**
512         * Sets the JMS userName used by connections created by this factory
513         */
514        public void setUserName(String userName) {
515            this.userName = userName;
516        }
517    
518        public boolean isUseRetroactiveConsumer() {
519            return useRetroactiveConsumer;
520        }
521    
522        /**
523         * Sets whether or not retroactive consumers are enabled. Retroactive
524         * consumers allow non-durable topic subscribers to receive old messages
525         * that were published before the non-durable subscriber started.
526         */
527        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
528            this.useRetroactiveConsumer = useRetroactiveConsumer;
529        }
530    
531        public boolean isExclusiveConsumer() {
532            return exclusiveConsumer;
533        }
534    
535        /**
536         * Enables or disables whether or not queue consumers should be exclusive or
537         * not for example to preserve ordering when not using <a
538         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
539         * 
540         * @param exclusiveConsumer
541         */
542        public void setExclusiveConsumer(boolean exclusiveConsumer) {
543            this.exclusiveConsumer = exclusiveConsumer;
544        }
545    
546        public RedeliveryPolicy getRedeliveryPolicy() {
547            return redeliveryPolicy;
548        }
549    
550        /**
551         * Sets the global redelivery policy to be used when a message is delivered
552         * but the session is rolled back
553         */
554        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
555            this.redeliveryPolicy = redeliveryPolicy;
556        }
557    
558        public MessageTransformer getTransformer() {
559            return transformer;
560        }
561        
562        /**
563         * @return the sendTimeout
564         */
565        public int getSendTimeout() {
566            return sendTimeout;
567        }
568    
569        /**
570         * @param sendTimeout the sendTimeout to set
571         */
572        public void setSendTimeout(int sendTimeout) {
573            this.sendTimeout = sendTimeout;
574        }
575        
576        /**
577         * @return the sendAcksAsync
578         */
579        public boolean isSendAcksAsync() {
580            return sendAcksAsync;
581        }
582    
583        /**
584         * @param sendAcksAsync the sendAcksAsync to set
585         */
586        public void setSendAcksAsync(boolean sendAcksAsync) {
587            this.sendAcksAsync = sendAcksAsync;
588        }
589        
590        /**
591         * @return the messagePrioritySupported
592         */
593        public boolean isMessagePrioritySupported() {
594            return this.messagePrioritySupported;
595        }
596    
597        /**
598         * @param messagePrioritySupported the messagePrioritySupported to set
599         */
600        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
601            this.messagePrioritySupported = messagePrioritySupported;
602        }
603    
604    
605        /**
606         * Sets the transformer used to transform messages before they are sent on
607         * to the JMS bus or when they are received from the bus but before they are
608         * delivered to the JMS client
609         */
610        public void setTransformer(MessageTransformer transformer) {
611            this.transformer = transformer;
612        }
613    
614        @Override
615        public void buildFromProperties(Properties properties) {
616    
617            if (properties == null) {
618                properties = new Properties();
619            }
620    
621            String temp = properties.getProperty(Context.PROVIDER_URL);
622            if (temp == null || temp.length() == 0) {
623                temp = properties.getProperty("brokerURL");
624            }
625            if (temp != null && temp.length() > 0) {
626                setBrokerURL(temp);
627            }
628    
629            Map<String, Object> p = new HashMap(properties);
630            buildFromMap(p);
631        }
632    
633        public boolean buildFromMap(Map<String, Object> properties) {
634            boolean rc = false;
635    
636            ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
637            if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
638                setPrefetchPolicy(p);
639                rc = true;
640            }
641    
642            RedeliveryPolicy rp = new RedeliveryPolicy();
643            if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
644                setRedeliveryPolicy(rp);
645                rc = true;
646            }
647    
648            BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
649            if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
650                setBlobTransferPolicy(blobTransferPolicy);
651                rc = true;
652            }
653    
654            rc |= IntrospectionSupport.setProperties(this, properties);
655    
656            return rc;
657        }
658    
659        @Override
660        public void populateProperties(Properties props) {
661            props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
662    
663            if (getBrokerURL() != null) {
664                props.setProperty(Context.PROVIDER_URL, getBrokerURL());
665                props.setProperty("brokerURL", getBrokerURL());
666            }
667    
668            if (getClientID() != null) {
669                props.setProperty("clientID", getClientID());
670            }
671    
672            IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
673            IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
674            IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
675    
676            props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
677            props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
678            props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
679            props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
680    
681            if (getPassword() != null) {
682                props.setProperty("password", getPassword());
683            }
684    
685            props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
686            props.setProperty("useCompression", Boolean.toString(isUseCompression()));
687            props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
688            props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
689    
690            if (getUserName() != null) {
691                props.setProperty("userName", getUserName());
692            }
693    
694            props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
695            props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
696            props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
697            props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
698            props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
699            props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
700            props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
701            props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
702            props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
703            props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
704            props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
705            props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
706        }
707    
708        public boolean isUseCompression() {
709            return useCompression;
710        }
711    
712        /**
713         * Enables the use of compression of the message bodies
714         */
715        public void setUseCompression(boolean useCompression) {
716            this.useCompression = useCompression;
717        }
718    
719        public boolean isObjectMessageSerializationDefered() {
720            return objectMessageSerializationDefered;
721        }
722    
723        /**
724         * When an object is set on an ObjectMessage, the JMS spec requires the
725         * object to be serialized by that set method. Enabling this flag causes the
726         * object to not get serialized. The object may subsequently get serialized
727         * if the message needs to be sent over a socket or stored to disk.
728         */
729        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
730            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
731        }
732    
733        public boolean isDispatchAsync() {
734            return dispatchAsync;
735        }
736    
737        /**
738         * Enables or disables the default setting of whether or not consumers have
739         * their messages <a
740         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
741         * synchronously or asynchronously by the broker</a>. For non-durable
742         * topics for example we typically dispatch synchronously by default to
743         * minimize context switches which boost performance. However sometimes its
744         * better to go slower to ensure that a single blocked consumer socket does
745         * not block delivery to other consumers.
746         * 
747         * @param asyncDispatch If true then consumers created on this connection
748         *                will default to having their messages dispatched
749         *                asynchronously. The default value is false.
750         */
751        public void setDispatchAsync(boolean asyncDispatch) {
752            this.dispatchAsync = asyncDispatch;
753        }
754    
755        /**
756         * @return Returns the closeTimeout.
757         */
758        public int getCloseTimeout() {
759            return closeTimeout;
760        }
761    
762        /**
763         * Sets the timeout before a close is considered complete. Normally a
764         * close() on a connection waits for confirmation from the broker; this
765         * allows that operation to timeout to save the client hanging if there is
766         * no broker
767         */
768        public void setCloseTimeout(int closeTimeout) {
769            this.closeTimeout = closeTimeout;
770        }
771    
772        /**
773         * @return Returns the alwaysSessionAsync.
774         */
775        public boolean isAlwaysSessionAsync() {
776            return alwaysSessionAsync;
777        }
778    
779        /**
780         * If this flag is set then a separate thread is not used for dispatching
781         * messages for each Session in the Connection. However, a separate thread
782         * is always used if there is more than one session, or the session isn't in
783         * auto acknowledge or duplicates ok mode
784         */
785        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
786            this.alwaysSessionAsync = alwaysSessionAsync;
787        }
788    
789        /**
790         * @return Returns the optimizeAcknowledge.
791         */
792        public boolean isOptimizeAcknowledge() {
793            return optimizeAcknowledge;
794        }
795    
796        /**
797         * @param optimizeAcknowledge The optimizeAcknowledge to set.
798         */
799        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
800            this.optimizeAcknowledge = optimizeAcknowledge;
801        }
802    
803        public boolean isNestedMapAndListEnabled() {
804            return nestedMapAndListEnabled;
805        }
806    
807        /**
808         * Enables/disables whether or not Message properties and MapMessage entries
809         * support <a
810         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
811         * Structures</a> of Map and List objects
812         */
813        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
814            this.nestedMapAndListEnabled = structuredMapsEnabled;
815        }
816    
817        public String getClientIDPrefix() {
818            return clientIDPrefix;
819        }
820    
821        /**
822         * Sets the prefix used by autogenerated JMS Client ID values which are used
823         * if the JMS client does not explicitly specify on.
824         * 
825         * @param clientIDPrefix
826         */
827        public void setClientIDPrefix(String clientIDPrefix) {
828            this.clientIDPrefix = clientIDPrefix;
829        }
830    
831        protected synchronized IdGenerator getClientIdGenerator() {
832            if (clientIdGenerator == null) {
833                if (clientIDPrefix != null) {
834                    clientIdGenerator = new IdGenerator(clientIDPrefix);
835                } else {
836                    clientIdGenerator = new IdGenerator();
837                }
838            }
839            return clientIdGenerator;
840        }
841    
842        protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
843            this.clientIdGenerator = clientIdGenerator;
844        }
845    
846        /**
847         * @return the statsEnabled
848         */
849        public boolean isStatsEnabled() {
850            return this.factoryStats.isEnabled();
851        }
852    
853        /**
854         * @param statsEnabled the statsEnabled to set
855         */
856        public void setStatsEnabled(boolean statsEnabled) {
857            this.factoryStats.setEnabled(statsEnabled);
858        }
859    
860        public synchronized int getProducerWindowSize() {
861            return producerWindowSize;
862        }
863    
864        public synchronized void setProducerWindowSize(int producerWindowSize) {
865            this.producerWindowSize = producerWindowSize;
866        }
867    
868        public long getWarnAboutUnstartedConnectionTimeout() {
869            return warnAboutUnstartedConnectionTimeout;
870        }
871    
872        /**
873         * Enables the timeout from a connection creation to when a warning is
874         * generated if the connection is not properly started via
875         * {@link Connection#start()} and a message is received by a consumer. It is
876         * a very common gotcha to forget to <a
877         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
878         * the connection</a> so this option makes the default case to create a
879         * warning if the user forgets. To disable the warning just set the value to <
880         * 0 (say -1).
881         */
882        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
883            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
884        }
885    
886        public TransportListener getTransportListener() {
887            return transportListener;
888        }
889    
890        /**
891         * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
892         * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
893         * a transport listener.
894         *
895         * @param transportListener sets the listener to be registered on all connections
896         * created by this factory
897         */
898        public void setTransportListener(TransportListener transportListener) {
899            this.transportListener = transportListener;
900        }
901        
902        
903        public ExceptionListener getExceptionListener() {
904            return exceptionListener;
905        }
906        
907        /**
908         * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
909         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
910         * an exception listener.
911         * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
912         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
913         * @param exceptionListener sets the exception listener to be registered on all connections
914         * created by this factory
915         */
916        public void setExceptionListener(ExceptionListener exceptionListener) {
917            this.exceptionListener = exceptionListener;
918        }
919    
920            public int getAuditDepth() {
921                    return auditDepth;
922            }
923    
924            public void setAuditDepth(int auditDepth) {
925                    this.auditDepth = auditDepth;
926            }
927    
928            public int getAuditMaximumProducerNumber() {
929                    return auditMaximumProducerNumber;
930            }
931    
932            public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
933                    this.auditMaximumProducerNumber = auditMaximumProducerNumber;
934            }
935    
936        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
937            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
938        }
939        
940        public boolean isUseDedicatedTaskRunner() {
941            return useDedicatedTaskRunner;
942        }
943        
944        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
945            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
946        }
947        
948        public long getConsumerFailoverRedeliveryWaitPeriod() {
949            return consumerFailoverRedeliveryWaitPeriod;
950        }
951    
952        public ClientInternalExceptionListener getClientInternalExceptionListener() {
953            return clientInternalExceptionListener;
954        }
955        
956        /**
957         * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
958         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
959         * an exception listener.
960         * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
961         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
962         * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
963         * created by this factory
964         */
965        public void setClientInternalExceptionListener(
966                ClientInternalExceptionListener clientInternalExceptionListener) {
967            this.clientInternalExceptionListener = clientInternalExceptionListener;
968        }
969    
970        /**
971         * @return the checkForDuplicates
972         */
973        public boolean isCheckForDuplicates() {
974            return this.checkForDuplicates;
975        }
976    
977        /**
978         * @param checkForDuplicates the checkForDuplicates to set
979         */
980        public void setCheckForDuplicates(boolean checkForDuplicates) {
981            this.checkForDuplicates = checkForDuplicates;
982        }
983    }