001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.InterruptedIOException;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.List;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.jms.JMSException;
026    import javax.jms.TransactionInProgressException;
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAException;
029    import javax.transaction.xa.XAResource;
030    import javax.transaction.xa.Xid;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.command.ConnectionId;
034    import org.apache.activemq.command.DataArrayResponse;
035    import org.apache.activemq.command.DataStructure;
036    import org.apache.activemq.command.IntegerResponse;
037    import org.apache.activemq.command.LocalTransactionId;
038    import org.apache.activemq.command.Response;
039    import org.apache.activemq.command.TransactionId;
040    import org.apache.activemq.command.TransactionInfo;
041    import org.apache.activemq.command.XATransactionId;
042    import org.apache.activemq.transaction.Synchronization;
043    import org.apache.activemq.util.JMSExceptionSupport;
044    import org.apache.activemq.util.LongSequenceGenerator;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A TransactionContext provides the means to control a JMS transaction. It
050     * provides a local transaction interface and also an XAResource interface. <p/>
051     * An application server controls the transactional assignment of an XASession
052     * by obtaining its XAResource. It uses the XAResource to assign the session to
053     * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054     * XAResource provides some fairly sophisticated facilities for interleaving
055     * work on multiple transactions, recovering a list of transactions in progress,
056     * and so on. A JTA aware JMS provider must fully implement this functionality.
057     * This could be done by using the services of a database that supports XA, or a
058     * JMS provider may choose to implement this functionality from scratch. <p/>
059     * 
060     * 
061     * @see javax.jms.Session
062     * @see javax.jms.QueueSession
063     * @see javax.jms.TopicSession
064     * @see javax.jms.XASession
065     */
066    public class TransactionContext implements XAResource {
067    
068        private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
069    
070        // XATransactionId -> ArrayList of TransactionContext objects
071        private final static ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
072    
073        private final ActiveMQConnection connection;
074        private final LongSequenceGenerator localTransactionIdGenerator;
075        private final ConnectionId connectionId;
076        private List<Synchronization> synchronizations;
077    
078        // To track XA transactions.
079        private Xid associatedXid;
080        private TransactionId transactionId;
081        private LocalTransactionEventListener localTransactionEventListener;
082        private int beforeEndIndex;
083    
084        public TransactionContext(ActiveMQConnection connection) {
085            this.connection = connection;
086            this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
087            this.connectionId = connection.getConnectionInfo().getConnectionId();
088        }
089    
090        public boolean isInXATransaction() {
091            return (transactionId != null && transactionId.isXATransaction()) || !ENDED_XA_TRANSACTION_CONTEXTS.isEmpty();
092        }
093    
094        public boolean isInLocalTransaction() {
095            return transactionId != null && transactionId.isLocalTransaction();
096        }
097    
098        public boolean isInTransaction() {
099            return transactionId != null;
100        }
101        
102        /**
103         * @return Returns the localTransactionEventListener.
104         */
105        public LocalTransactionEventListener getLocalTransactionEventListener() {
106            return localTransactionEventListener;
107        }
108    
109        /**
110         * Used by the resource adapter to listen to transaction events.
111         * 
112         * @param localTransactionEventListener The localTransactionEventListener to
113         *                set.
114         */
115        public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
116            this.localTransactionEventListener = localTransactionEventListener;
117        }
118    
119        // ///////////////////////////////////////////////////////////
120        //
121        // Methods that work with the Synchronization objects registered with
122        // the transaction.
123        //
124        // ///////////////////////////////////////////////////////////
125    
126        public void addSynchronization(Synchronization s) {
127            if (synchronizations == null) {
128                synchronizations = new ArrayList<Synchronization>(10);
129            }
130            synchronizations.add(s);
131        }
132    
133        private void afterRollback() throws JMSException {
134            if (synchronizations == null) {
135                return;
136            }
137    
138            int size = synchronizations.size();
139            try {
140                for (int i = 0; i < size; i++) {
141                    synchronizations.get(i).afterRollback();
142                }
143            } catch (JMSException e) {
144                throw e;
145            } catch (Throwable e) {
146                throw JMSExceptionSupport.create(e);
147            } finally {
148                synchronizations = null;
149            }
150        }
151    
152        private void afterCommit() throws JMSException {
153            if (synchronizations == null) {
154                return;
155            }
156    
157            int size = synchronizations.size();
158            try {
159                for (int i = 0; i < size; i++) {
160                    synchronizations.get(i).afterCommit();
161                }
162            } catch (JMSException e) {
163                throw e;
164            } catch (Throwable e) {
165                throw JMSExceptionSupport.create(e);
166            } finally {
167                synchronizations = null;
168            }
169        }
170    
171        private void beforeEnd() throws JMSException {
172            if (synchronizations == null) {
173                return;
174            }
175    
176            int size = synchronizations.size();
177            try {
178                for (;beforeEndIndex < size;) {
179                    synchronizations.get(beforeEndIndex++).beforeEnd();
180                }
181            } catch (JMSException e) {
182                throw e;
183            } catch (Throwable e) {
184                throw JMSExceptionSupport.create(e);
185            }
186        }
187    
188        public TransactionId getTransactionId() {
189            return transactionId;
190        }
191    
192        // ///////////////////////////////////////////////////////////
193        //
194        // Local transaction interface.
195        //
196        // ///////////////////////////////////////////////////////////
197    
198        /**
199         * Start a local transaction.
200         * @throws javax.jms.JMSException on internal error
201         */
202        public void begin() throws JMSException {
203    
204            if (isInXATransaction()) {
205                throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
206            }
207            
208            if (transactionId == null) {
209                synchronizations = null;
210                beforeEndIndex = 0;
211                this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
212                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
213                this.connection.ensureConnectionInfoSent();
214                this.connection.asyncSendPacket(info);
215    
216                // Notify the listener that the tx was started.
217                if (localTransactionEventListener != null) {
218                    localTransactionEventListener.beginEvent();
219                }
220                if (LOG.isDebugEnabled()) {
221                    LOG.debug("Begin:" + transactionId);
222                }
223            }
224            
225        }
226    
227        /**
228         * Rolls back any work done in this transaction and releases any locks
229         * currently held.
230         * 
231         * @throws JMSException if the JMS provider fails to roll back the
232         *                 transaction due to some internal error.
233         * @throws javax.jms.IllegalStateException if the method is not called by a
234         *                 transacted session.
235         */
236        public void rollback() throws JMSException {
237            if (isInXATransaction()) {
238                throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
239            }
240            
241            try {
242                beforeEnd();
243            } catch (TransactionRolledBackException canOcurrOnFailover) {
244                LOG.warn("rollback processing error", canOcurrOnFailover);
245            }
246            if (transactionId != null) {
247                if (LOG.isDebugEnabled()) {
248                    LOG.debug("Rollback: "  + transactionId
249                    + " syncCount: " 
250                    + (synchronizations != null ? synchronizations.size() : 0));
251                }
252    
253                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
254                this.transactionId = null;
255                //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
256                this.connection.syncSendPacket(info);
257                // Notify the listener that the tx was rolled back
258                if (localTransactionEventListener != null) {
259                    localTransactionEventListener.rollbackEvent();
260                }
261            }
262    
263            afterRollback();
264        }
265    
266        /**
267         * Commits all work done in this transaction and releases any locks
268         * currently held.
269         * 
270         * @throws JMSException if the JMS provider fails to commit the transaction
271         *                 due to some internal error.
272         * @throws javax.jms.IllegalStateException if the method is not called by a
273         *                 transacted session.
274         */
275        public void commit() throws JMSException {
276            if (isInXATransaction()) {
277                throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
278            }
279            
280            try {
281                beforeEnd();
282            } catch (JMSException e) {
283                rollback();
284                throw e;
285            }
286    
287            // Only send commit if the transaction was started.
288            if (transactionId != null) {
289                if (LOG.isDebugEnabled()) {
290                    LOG.debug("Commit: "  + transactionId
291                            + " syncCount: " 
292                            + (synchronizations != null ? synchronizations.size() : 0));
293                }
294    
295                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
296                this.transactionId = null;
297                // Notify the listener that the tx was committed back
298                try {
299                    syncSendPacketWithInterruptionHandling(info);
300                    if (localTransactionEventListener != null) {
301                        localTransactionEventListener.commitEvent();
302                    }
303                    afterCommit();
304                } catch (JMSException cause) {
305                    LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
306                    if (localTransactionEventListener != null) {
307                        localTransactionEventListener.rollbackEvent();
308                    }
309                    afterRollback();
310                    throw cause;
311                }
312                
313            }
314        }
315    
316        // ///////////////////////////////////////////////////////////
317        //
318        // XAResource Implementation
319        //
320        // ///////////////////////////////////////////////////////////
321        /**
322         * Associates a transaction with the resource.
323         */
324        public void start(Xid xid, int flags) throws XAException {
325    
326            if (LOG.isDebugEnabled()) {
327                LOG.debug("Start: " + xid);
328            }
329            if (isInLocalTransaction()) {
330                throw new XAException(XAException.XAER_PROTO);
331            }
332            // Are we already associated?
333            if (associatedXid != null) {
334                throw new XAException(XAException.XAER_PROTO);
335            }
336    
337            // if ((flags & TMJOIN) == TMJOIN) {
338            // TODO: verify that the server has seen the xid
339            // // }
340            // if ((flags & TMJOIN) == TMRESUME) {
341            // // TODO: verify that the xid was suspended.
342            // }
343    
344            // associate
345            synchronizations = null;
346            beforeEndIndex = 0;
347            setXid(xid);
348        }
349    
350        /**
351         * @return connectionId for connection
352         */
353        private ConnectionId getConnectionId() {
354            return connection.getConnectionInfo().getConnectionId();
355        }
356    
357        public void end(Xid xid, int flags) throws XAException {
358    
359            if (LOG.isDebugEnabled()) {
360                LOG.debug("End: " + xid);
361            }
362            
363            if (isInLocalTransaction()) {
364                throw new XAException(XAException.XAER_PROTO);
365            }
366            
367            if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
368                // You can only suspend the associated xid.
369                if (!equals(associatedXid, xid)) {
370                    throw new XAException(XAException.XAER_PROTO);
371                }
372    
373                // TODO: we may want to put the xid in a suspended list.
374                try {
375                    beforeEnd();
376                } catch (JMSException e) {
377                    throw toXAException(e);
378                }
379                setXid(null);
380            } else if ((flags & TMSUCCESS) == TMSUCCESS) {
381                // set to null if this is the current xid.
382                // otherwise this could be an asynchronous success call
383                if (equals(associatedXid, xid)) {
384                    try {
385                        beforeEnd();
386                    } catch (JMSException e) {
387                        throw toXAException(e);
388                    }
389                    setXid(null);
390                }
391            } else {
392                throw new XAException(XAException.XAER_INVAL);
393            }
394        }
395    
396        private boolean equals(Xid xid1, Xid xid2) {
397            if (xid1 == xid2) {
398                return true;
399            }
400            if (xid1 == null ^ xid2 == null) {
401                return false;
402            }
403            return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
404                   && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
405        }
406    
407        public int prepare(Xid xid) throws XAException {
408            if (LOG.isDebugEnabled()) {
409                LOG.debug("Prepare: " + xid);
410            }
411            
412            // We allow interleaving multiple transactions, so
413            // we don't limit prepare to the associated xid.
414            XATransactionId x;
415            // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
416            // called first
417            if (xid == null || (equals(associatedXid, xid))) {
418                throw new XAException(XAException.XAER_PROTO);
419            } else {
420                // TODO: cache the known xids so we don't keep recreating this one??
421                x = new XATransactionId(xid);
422            }
423    
424            try {
425                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
426    
427                // Find out if the server wants to commit or rollback.
428                IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
429                if (XAResource.XA_RDONLY == response.getResult()) {
430                    // transaction stops now, may be syncs that need a callback
431                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
432                    if (l != null && !l.isEmpty()) {
433                        if (LOG.isDebugEnabled()) {
434                            LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
435                        }
436                        for (TransactionContext ctx : l) {
437                            ctx.afterCommit();
438                        }
439                    }
440                }
441                return response.getResult();
442    
443            } catch (JMSException e) {
444                LOG.warn("prepare of: " + x + " failed with: " + e, e);
445                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
446                if (l != null && !l.isEmpty()) {
447                    for (TransactionContext ctx : l) {
448                        try {
449                            ctx.afterRollback();
450                        } catch (Throwable ignored) {
451                            if (LOG.isDebugEnabled()) {
452                                LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + x + ", context: " + ctx, ignored);
453                            }
454                        }
455                    }
456                }
457                throw toXAException(e);
458            }
459        }
460    
461        public void rollback(Xid xid) throws XAException {
462    
463            if (LOG.isDebugEnabled()) {
464                LOG.debug("Rollback: " + xid);
465            }
466            
467            // We allow interleaving multiple transactions, so
468            // we don't limit rollback to the associated xid.
469            XATransactionId x;
470            if (xid == null) {
471                throw new XAException(XAException.XAER_PROTO);
472            }
473            if (equals(associatedXid, xid)) {
474                // I think this can happen even without an end(xid) call. Need to
475                // check spec.
476                x = (XATransactionId)transactionId;
477            } else {
478                x = new XATransactionId(xid);
479            }
480    
481            try {
482                this.connection.checkClosedOrFailed();
483                this.connection.ensureConnectionInfoSent();
484    
485                // Let the server know that the tx is rollback.
486                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
487                syncSendPacketWithInterruptionHandling(info);
488    
489                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
490                if (l != null && !l.isEmpty()) {
491                    for (TransactionContext ctx : l) {
492                        ctx.afterRollback();
493                    }
494                }
495    
496            } catch (JMSException e) {
497                throw toXAException(e);
498            }
499        }
500    
501        // XAResource interface
502        public void commit(Xid xid, boolean onePhase) throws XAException {
503    
504            if (LOG.isDebugEnabled()) {
505                LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
506            }
507            
508            // We allow interleaving multiple transactions, so
509            // we don't limit commit to the associated xid.
510            XATransactionId x;
511            if (xid == null || (equals(associatedXid, xid))) {
512                // should never happen, end(xid,TMSUCCESS) must have been previously
513                // called
514                throw new XAException(XAException.XAER_PROTO);
515            } else {
516                x = new XATransactionId(xid);
517            }
518    
519            try {
520                this.connection.checkClosedOrFailed();
521                this.connection.ensureConnectionInfoSent();
522    
523                // Notify the server that the tx was committed back
524                TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
525    
526                syncSendPacketWithInterruptionHandling(info);
527    
528                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
529                if (l != null && !l.isEmpty()) {
530                    for (TransactionContext ctx : l) {
531                        ctx.afterCommit();
532                    }
533                }
534    
535            } catch (JMSException e) {
536                LOG.warn("commit of: " + x + " failed with: " + e, e);
537                if (onePhase) {
538                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
539                    if (l != null && !l.isEmpty()) {
540                        for (TransactionContext ctx : l) {
541                            try {
542                                ctx.afterRollback();
543                            } catch (Throwable ignored) {
544                                if (LOG.isDebugEnabled()) {
545                                    LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
546                                }
547                            }
548                        }
549                    }
550                }
551                throw toXAException(e);
552            }
553    
554        }
555    
556        public void forget(Xid xid) throws XAException {
557            if (LOG.isDebugEnabled()) {
558                LOG.debug("Forget: " + xid);
559            }
560            
561            // We allow interleaving multiple transactions, so
562            // we don't limit forget to the associated xid.
563            XATransactionId x;
564            if (xid == null) {
565                throw new XAException(XAException.XAER_PROTO);
566            }
567            if (equals(associatedXid, xid)) {
568                // TODO determine if this can happen... I think not.
569                x = (XATransactionId)transactionId;
570            } else {
571                x = new XATransactionId(xid);
572            }
573    
574            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
575    
576            try {
577                // Tell the server to forget the transaction.
578                syncSendPacketWithInterruptionHandling(info);
579            } catch (JMSException e) {
580                throw toXAException(e);
581            }
582            ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
583        }
584    
585        public boolean isSameRM(XAResource xaResource) throws XAException {
586            if (xaResource == null) {
587                return false;
588            }
589            if (!(xaResource instanceof TransactionContext)) {
590                return false;
591            }
592            TransactionContext xar = (TransactionContext)xaResource;
593            try {
594                return getResourceManagerId().equals(xar.getResourceManagerId());
595            } catch (Throwable e) {
596                throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
597            }
598        }
599    
600        public Xid[] recover(int flag) throws XAException {
601            if (LOG.isDebugEnabled()) {
602                LOG.debug("Recover: " + flag);
603            }
604            
605            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
606            try {
607                this.connection.checkClosedOrFailed();
608                this.connection.ensureConnectionInfoSent();
609    
610                DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
611                DataStructure[] data = receipt.getData();
612                XATransactionId[] answer;
613                if (data instanceof XATransactionId[]) {
614                    answer = (XATransactionId[])data;
615                } else {
616                    answer = new XATransactionId[data.length];
617                    System.arraycopy(data, 0, answer, 0, data.length);
618                }
619                return answer;
620            } catch (JMSException e) {
621                throw toXAException(e);
622            }
623        }
624    
625        public int getTransactionTimeout() throws XAException {
626            return 0;
627        }
628    
629        public boolean setTransactionTimeout(int seconds) throws XAException {
630            return false;
631        }
632    
633        // ///////////////////////////////////////////////////////////
634        //
635        // Helper methods.
636        //
637        // ///////////////////////////////////////////////////////////
638        private String getResourceManagerId() throws JMSException {
639            return this.connection.getResourceManagerId();
640        }
641    
642        private void setXid(Xid xid) throws XAException {
643    
644            try {
645                this.connection.checkClosedOrFailed();
646                this.connection.ensureConnectionInfoSent();
647            } catch (JMSException e) {
648                throw toXAException(e);
649            }
650    
651            if (xid != null) {
652                // associate
653                associatedXid = xid;
654                transactionId = new XATransactionId(xid);
655    
656                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
657                try {
658                    this.connection.asyncSendPacket(info);
659                    if (LOG.isDebugEnabled()) {
660                        LOG.debug("Started XA transaction: " + transactionId);
661                    }
662                } catch (JMSException e) {
663                    throw toXAException(e);
664                }
665    
666            } else {
667    
668                if (transactionId != null) {
669                    TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
670                    try {
671                        syncSendPacketWithInterruptionHandling(info);
672                        if (LOG.isDebugEnabled()) {
673                            LOG.debug("Ended XA transaction: " + transactionId);
674                        }
675                    } catch (JMSException e) {
676                        throw toXAException(e);
677                    }
678    
679                    // Add our self to the list of contexts that are interested in
680                    // post commit/rollback events.
681                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
682                    if (l == null) {
683                        l = new ArrayList<TransactionContext>(3);
684                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
685                        l.add(this);
686                    } else if (!l.contains(this)) {
687                        l.add(this);
688                    }
689                }
690    
691                // dis-associate
692                associatedXid = null;
693                transactionId = null;
694            }
695        }
696    
697        /**
698         * Sends the given command. Also sends the command in case of interruption,
699         * so that important commands like rollback and commit are never interrupted.
700         * If interruption occurred, set the interruption state of the current 
701         * after performing the action again. 
702         * 
703         * @return the response
704         */
705        private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
706            try {
707                return this.connection.syncSendPacket(command);
708            } catch (JMSException e) {
709                if (e.getLinkedException() instanceof InterruptedIOException) {
710                    try {
711                        Thread.interrupted();
712                        return this.connection.syncSendPacket(command);
713                    } finally {
714                        Thread.currentThread().interrupt();
715                    }               
716                }
717                
718                throw e;
719            }
720        }
721    
722        /**
723         * Converts a JMSException from the server to an XAException. if the
724         * JMSException contained a linked XAException that is returned instead.
725         * 
726         * @param e JMSException to convert
727         * @return XAException wrapping original exception or its message
728         */
729        private XAException toXAException(JMSException e) {
730            if (e.getCause() != null && e.getCause() instanceof XAException) {
731                XAException original = (XAException)e.getCause();
732                XAException xae = new XAException(original.getMessage());
733                xae.errorCode = original.errorCode;
734                xae.initCause(original);
735                return xae;
736            }
737    
738            XAException xae = new XAException(e.getMessage());
739            xae.errorCode = XAException.XAER_RMFAIL;
740            xae.initCause(e);
741            return xae;
742        }
743    
744        public ActiveMQConnection getConnection() {
745            return connection;
746        }
747    
748        public void cleanup() {
749            associatedXid = null;
750            transactionId = null;
751        }
752    
753        @Override
754        public String toString() {
755            return "TransactionContext{" +
756                    "transactionId=" + transactionId +
757                    '}';
758        }
759    }