001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.DataInputStream; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.CancellationException; 026 import java.util.concurrent.ConcurrentHashMap; 027 import java.util.concurrent.ExecutionException; 028 import java.util.concurrent.Future; 029 import javax.transaction.xa.XAException; 030 import org.apache.activemq.broker.ConnectionContext; 031 import org.apache.activemq.command.Message; 032 import org.apache.activemq.command.MessageAck; 033 import org.apache.activemq.command.MessageId; 034 import org.apache.activemq.command.TransactionId; 035 import org.apache.activemq.command.XATransactionId; 036 import org.apache.activemq.openwire.OpenWireFormat; 037 import org.apache.activemq.protobuf.Buffer; 038 import org.apache.activemq.store.AbstractMessageStore; 039 import org.apache.activemq.store.MessageStore; 040 import org.apache.activemq.store.ProxyMessageStore; 041 import org.apache.activemq.store.ProxyTopicMessageStore; 042 import org.apache.activemq.store.TopicMessageStore; 043 import org.apache.activemq.store.TransactionRecoveryListener; 044 import org.apache.activemq.store.TransactionStore; 045 import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation; 046 import org.apache.activemq.store.kahadb.MessageDatabase.Operation; 047 import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation; 048 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 049 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 050 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 051 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 052 import org.apache.activemq.wireformat.WireFormat; 053 import org.slf4j.Logger; 054 import org.slf4j.LoggerFactory; 055 056 /** 057 * Provides a TransactionStore implementation that can create transaction aware 058 * MessageStore objects from non transaction aware MessageStore objects. 059 * 060 * 061 */ 062 public class KahaDBTransactionStore implements TransactionStore { 063 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); 064 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 065 private final WireFormat wireFormat = new OpenWireFormat(); 066 private final KahaDBStore theStore; 067 068 public KahaDBTransactionStore(KahaDBStore theStore) { 069 this.theStore = theStore; 070 } 071 072 public class Tx { 073 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 074 075 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 076 077 public void add(AddMessageCommand msg) { 078 messages.add(msg); 079 } 080 081 public void add(RemoveMessageCommand ack) { 082 acks.add(ack); 083 } 084 085 public Message[] getMessages() { 086 Message rc[] = new Message[messages.size()]; 087 int count = 0; 088 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 089 AddMessageCommand cmd = iter.next(); 090 rc[count++] = cmd.getMessage(); 091 } 092 return rc; 093 } 094 095 public MessageAck[] getAcks() { 096 MessageAck rc[] = new MessageAck[acks.size()]; 097 int count = 0; 098 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 099 RemoveMessageCommand cmd = iter.next(); 100 rc[count++] = cmd.getMessageAck(); 101 } 102 return rc; 103 } 104 105 /** 106 * @return true if something to commit 107 * @throws IOException 108 */ 109 public List<Future<Object>> commit() throws IOException { 110 List<Future<Object>> results = new ArrayList<Future<Object>>(); 111 // Do all the message adds. 112 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 113 AddMessageCommand cmd = iter.next(); 114 results.add(cmd.run()); 115 116 } 117 // And removes.. 118 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 119 RemoveMessageCommand cmd = iter.next(); 120 cmd.run(); 121 results.add(cmd.run()); 122 } 123 124 return results; 125 } 126 } 127 128 public abstract class AddMessageCommand { 129 private final ConnectionContext ctx; 130 AddMessageCommand(ConnectionContext ctx) { 131 this.ctx = ctx; 132 } 133 abstract Message getMessage(); 134 Future<Object> run() throws IOException { 135 return run(this.ctx); 136 } 137 abstract Future<Object> run(ConnectionContext ctx) throws IOException; 138 } 139 140 public abstract class RemoveMessageCommand { 141 142 private final ConnectionContext ctx; 143 RemoveMessageCommand(ConnectionContext ctx) { 144 this.ctx = ctx; 145 } 146 abstract MessageAck getMessageAck(); 147 Future<Object> run() throws IOException { 148 return run(this.ctx); 149 } 150 abstract Future<Object> run(ConnectionContext context) throws IOException; 151 } 152 153 public MessageStore proxy(MessageStore messageStore) { 154 return new ProxyMessageStore(messageStore) { 155 @Override 156 public void addMessage(ConnectionContext context, final Message send) throws IOException { 157 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 158 } 159 160 @Override 161 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 162 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 163 } 164 165 @Override 166 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 167 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 168 } 169 170 @Override 171 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 172 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 173 } 174 }; 175 } 176 177 public TopicMessageStore proxy(TopicMessageStore messageStore) { 178 return new ProxyTopicMessageStore(messageStore) { 179 @Override 180 public void addMessage(ConnectionContext context, final Message send) throws IOException { 181 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 182 } 183 184 @Override 185 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 186 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 187 } 188 189 @Override 190 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 191 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 192 } 193 194 @Override 195 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 196 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 197 } 198 199 @Override 200 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 201 MessageId messageId, MessageAck ack) throws IOException { 202 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, 203 subscriptionName, messageId, ack); 204 } 205 206 }; 207 } 208 209 /** 210 * @throws IOException 211 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 212 */ 213 public void prepare(TransactionId txid) throws IOException { 214 inflightTransactions.remove(txid); 215 KahaTransactionInfo info = getTransactionInfo(txid); 216 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 217 } 218 219 public Tx getTx(Object txid) { 220 Tx tx = inflightTransactions.get(txid); 221 if (tx == null) { 222 tx = new Tx(); 223 inflightTransactions.put(txid, tx); 224 } 225 return tx; 226 } 227 228 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 229 throws IOException { 230 if (txid != null) { 231 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { 232 if (preCommit != null) { 233 preCommit.run(); 234 } 235 Tx tx = inflightTransactions.remove(txid); 236 if (tx != null) { 237 List<Future<Object>> results = tx.commit(); 238 boolean doneSomething = false; 239 for (Future<Object> result : results) { 240 try { 241 result.get(); 242 } catch (InterruptedException e) { 243 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 244 } catch (ExecutionException e) { 245 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 246 }catch(CancellationException e) { 247 } 248 if (!result.isCancelled()) { 249 doneSomething = true; 250 } 251 } 252 if (postCommit != null) { 253 postCommit.run(); 254 } 255 if (doneSomething) { 256 KahaTransactionInfo info = getTransactionInfo(txid); 257 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null); 258 } 259 }else { 260 //The Tx will be null for failed over clients - lets run their post commits 261 if (postCommit != null) { 262 postCommit.run(); 263 } 264 } 265 266 } else { 267 KahaTransactionInfo info = getTransactionInfo(txid); 268 // ensure message order w.r.t to cursor and store for setBatch() 269 synchronized (this) { 270 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); 271 } 272 } 273 }else { 274 LOG.error("Null transaction passed on commit"); 275 } 276 } 277 278 /** 279 * @throws IOException 280 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 281 */ 282 public void rollback(TransactionId txid) throws IOException { 283 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 284 KahaTransactionInfo info = getTransactionInfo(txid); 285 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null); 286 } else { 287 inflightTransactions.remove(txid); 288 } 289 } 290 291 public void start() throws Exception { 292 } 293 294 public void stop() throws Exception { 295 } 296 297 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 298 // All the inflight transactions get rolled back.. 299 // inflightTransactions.clear(); 300 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { 301 XATransactionId xid = (XATransactionId) entry.getKey(); 302 ArrayList<Message> messageList = new ArrayList<Message>(); 303 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 304 305 for (Operation op : entry.getValue()) { 306 if (op.getClass() == AddOpperation.class) { 307 AddOpperation addOp = (AddOpperation) op; 308 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage() 309 .newInput())); 310 messageList.add(msg); 311 } else { 312 RemoveOpperation rmOp = (RemoveOpperation) op; 313 Buffer ackb = rmOp.getCommand().getAck(); 314 MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput())); 315 ackList.add(ack); 316 } 317 } 318 319 Message[] addedMessages = new Message[messageList.size()]; 320 MessageAck[] acks = new MessageAck[ackList.size()]; 321 messageList.toArray(addedMessages); 322 ackList.toArray(acks); 323 listener.recover(xid, addedMessages, acks); 324 } 325 } 326 327 /** 328 * @param message 329 * @throws IOException 330 */ 331 void addMessage(ConnectionContext context, final MessageStore destination, final Message message) 332 throws IOException { 333 334 if (message.getTransactionId() != null) { 335 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 336 destination.addMessage(context, message); 337 } else { 338 Tx tx = getTx(message.getTransactionId()); 339 tx.add(new AddMessageCommand(context) { 340 @Override 341 public Message getMessage() { 342 return message; 343 } 344 @Override 345 public Future<Object> run(ConnectionContext ctx) throws IOException { 346 destination.addMessage(ctx, message); 347 return AbstractMessageStore.FUTURE; 348 } 349 350 }); 351 } 352 } else { 353 destination.addMessage(context, message); 354 } 355 } 356 357 Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) 358 throws IOException { 359 360 if (message.getTransactionId() != null) { 361 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 362 destination.addMessage(context, message); 363 return AbstractMessageStore.FUTURE; 364 } else { 365 Tx tx = getTx(message.getTransactionId()); 366 tx.add(new AddMessageCommand(context) { 367 @Override 368 public Message getMessage() { 369 return message; 370 } 371 @Override 372 public Future<Object> run(ConnectionContext ctx) throws IOException { 373 return destination.asyncAddQueueMessage(ctx, message); 374 } 375 376 }); 377 return AbstractMessageStore.FUTURE; 378 } 379 } else { 380 return destination.asyncAddQueueMessage(context, message); 381 } 382 } 383 384 Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) 385 throws IOException { 386 387 if (message.getTransactionId() != null) { 388 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 389 destination.addMessage(context, message); 390 return AbstractMessageStore.FUTURE; 391 } else { 392 Tx tx = getTx(message.getTransactionId()); 393 tx.add(new AddMessageCommand(context) { 394 @Override 395 public Message getMessage() { 396 return message; 397 } 398 @Override 399 public Future run(ConnectionContext ctx) throws IOException { 400 return destination.asyncAddTopicMessage(ctx, message); 401 } 402 403 }); 404 return AbstractMessageStore.FUTURE; 405 } 406 } else { 407 return destination.asyncAddTopicMessage(context, message); 408 } 409 } 410 411 /** 412 * @param ack 413 * @throws IOException 414 */ 415 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 416 throws IOException { 417 418 if (ack.isInTransaction()) { 419 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 420 destination.removeMessage(context, ack); 421 } else { 422 Tx tx = getTx(ack.getTransactionId()); 423 tx.add(new RemoveMessageCommand(context) { 424 @Override 425 public MessageAck getMessageAck() { 426 return ack; 427 } 428 429 @Override 430 public Future<Object> run(ConnectionContext ctx) throws IOException { 431 destination.removeMessage(ctx, ack); 432 return AbstractMessageStore.FUTURE; 433 } 434 }); 435 } 436 } else { 437 destination.removeMessage(context, ack); 438 } 439 } 440 441 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 442 throws IOException { 443 444 if (ack.isInTransaction()) { 445 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 446 destination.removeAsyncMessage(context, ack); 447 } else { 448 Tx tx = getTx(ack.getTransactionId()); 449 tx.add(new RemoveMessageCommand(context) { 450 @Override 451 public MessageAck getMessageAck() { 452 return ack; 453 } 454 455 @Override 456 public Future<Object> run(ConnectionContext ctx) throws IOException { 457 destination.removeMessage(ctx, ack); 458 return AbstractMessageStore.FUTURE; 459 } 460 }); 461 } 462 } else { 463 destination.removeAsyncMessage(context, ack); 464 } 465 } 466 467 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, 468 final MessageId messageId, final MessageAck ack) throws IOException { 469 470 if (ack.isInTransaction()) { 471 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 472 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 473 } else { 474 Tx tx = getTx(ack.getTransactionId()); 475 tx.add(new RemoveMessageCommand(context) { 476 public MessageAck getMessageAck() { 477 return ack; 478 } 479 480 public Future<Object> run(ConnectionContext ctx) throws IOException { 481 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 482 return AbstractMessageStore.FUTURE; 483 } 484 }); 485 } 486 } else { 487 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 488 } 489 } 490 491 492 private KahaTransactionInfo getTransactionInfo(TransactionId txid) { 493 return theStore.createTransactionInfo(txid); 494 } 495 496 }