001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transaction; 018 019 import java.io.IOException; 020 import javax.transaction.xa.XAException; 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.command.LocalTransactionId; 023 import org.apache.activemq.command.TransactionId; 024 import org.apache.activemq.store.TransactionStore; 025 import org.slf4j.Logger; 026 import org.slf4j.LoggerFactory; 027 028 /** 029 * 030 */ 031 public class LocalTransaction extends Transaction { 032 033 private static final Logger LOG = LoggerFactory.getLogger(LocalTransaction.class); 034 035 private final TransactionStore transactionStore; 036 private final LocalTransactionId xid; 037 private final ConnectionContext context; 038 039 public LocalTransaction(TransactionStore transactionStore, LocalTransactionId xid, ConnectionContext context) { 040 this.transactionStore = transactionStore; 041 this.xid = xid; 042 this.context = context; 043 } 044 045 @Override 046 public void commit(boolean onePhase) throws XAException, IOException { 047 if (LOG.isDebugEnabled()) { 048 LOG.debug("commit: " + xid 049 + " syncCount: " + size()); 050 } 051 052 // Get ready for commit. 053 try { 054 prePrepare(); 055 } catch (XAException e) { 056 throw e; 057 } catch (Throwable e) { 058 LOG.warn("COMMIT FAILED: ", e); 059 rollback(); 060 // Let them know we rolled back. 061 XAException xae = new XAException("COMMIT FAILED: Transaction rolled back."); 062 xae.errorCode = XAException.XA_RBOTHER; 063 xae.initCause(e); 064 throw xae; 065 } 066 067 setState(Transaction.FINISHED_STATE); 068 context.getTransactions().remove(xid); 069 // Sync on transaction store to avoid out of order messages in the cursor 070 // https://issues.apache.org/activemq/browse/AMQ-2594 071 try { 072 transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask); 073 this.waitPostCommitDone(postCommitTask); 074 } catch (Throwable t) { 075 LOG.warn("Store COMMIT FAILED: ", t); 076 rollback(); 077 XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back."); 078 xae.errorCode = XAException.XA_RBOTHER; 079 xae.initCause(t); 080 throw xae; 081 } 082 } 083 084 @Override 085 public void rollback() throws XAException, IOException { 086 087 if (LOG.isDebugEnabled()) { 088 LOG.debug("rollback: " + xid 089 + " syncCount: " + size()); 090 } 091 setState(Transaction.FINISHED_STATE); 092 context.getTransactions().remove(xid); 093 // Sync on transaction store to avoid out of order messages in the cursor 094 // https://issues.apache.org/activemq/browse/AMQ-2594 095 synchronized (transactionStore) { 096 transactionStore.rollback(getTransactionId()); 097 098 try { 099 fireAfterRollback(); 100 } catch (Throwable e) { 101 LOG.warn("POST ROLLBACK FAILED: ", e); 102 XAException xae = new XAException("POST ROLLBACK FAILED"); 103 xae.errorCode = XAException.XAER_RMERR; 104 xae.initCause(e); 105 throw xae; 106 } 107 } 108 } 109 110 @Override 111 public int prepare() throws XAException { 112 XAException xae = new XAException("Prepare not implemented on Local Transactions."); 113 xae.errorCode = XAException.XAER_RMERR; 114 throw xae; 115 } 116 117 @Override 118 public TransactionId getTransactionId() { 119 return xid; 120 } 121 122 @Override 123 public Logger getLog() { 124 return LOG; 125 } 126 }