001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.amq; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.FileLock; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.concurrent.ConcurrentHashMap; 030 import java.util.concurrent.CountDownLatch; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicInteger; 033 import java.util.concurrent.atomic.AtomicLong; 034 import org.apache.activeio.journal.Journal; 035 import org.apache.activemq.broker.BrokerService; 036 import org.apache.activemq.broker.BrokerServiceAware; 037 import org.apache.activemq.broker.ConnectionContext; 038 import org.apache.activemq.command.ActiveMQDestination; 039 import org.apache.activemq.command.ActiveMQQueue; 040 import org.apache.activemq.command.ActiveMQTopic; 041 import org.apache.activemq.command.DataStructure; 042 import org.apache.activemq.command.JournalQueueAck; 043 import org.apache.activemq.command.JournalTopicAck; 044 import org.apache.activemq.command.JournalTrace; 045 import org.apache.activemq.command.JournalTransaction; 046 import org.apache.activemq.command.Message; 047 import org.apache.activemq.command.ProducerId; 048 import org.apache.activemq.command.SubscriptionInfo; 049 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 050 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 051 import org.apache.activemq.kaha.impl.async.Location; 052 import org.apache.activemq.kaha.impl.index.hash.HashIndex; 053 import org.apache.activemq.openwire.OpenWireFormat; 054 import org.apache.activemq.store.MessageStore; 055 import org.apache.activemq.store.PersistenceAdapter; 056 import org.apache.activemq.store.ReferenceStore; 057 import org.apache.activemq.store.ReferenceStoreAdapter; 058 import org.apache.activemq.store.TopicMessageStore; 059 import org.apache.activemq.store.TopicReferenceStore; 060 import org.apache.activemq.store.TransactionStore; 061 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; 062 import org.apache.activemq.thread.Scheduler; 063 import org.apache.activemq.thread.Task; 064 import org.apache.activemq.thread.TaskRunner; 065 import org.apache.activemq.thread.TaskRunnerFactory; 066 import org.apache.activemq.usage.SystemUsage; 067 import org.apache.activemq.usage.Usage; 068 import org.apache.activemq.usage.UsageListener; 069 import org.apache.activemq.util.ByteSequence; 070 import org.apache.activemq.util.IOExceptionSupport; 071 import org.apache.activemq.util.IOHelper; 072 import org.apache.activemq.wireformat.WireFormat; 073 import org.slf4j.Logger; 074 import org.slf4j.LoggerFactory; 075 076 077 /** 078 * An implementation of {@link PersistenceAdapter} designed for use with a 079 * {@link Journal} and then check pointing asynchronously on a timeout with some 080 * other long term persistent storage. 081 * 082 * @org.apache.xbean.XBean element="amqPersistenceAdapter" 083 * 084 */ 085 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { 086 087 private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class); 088 private Scheduler scheduler; 089 private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>(); 090 private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>(); 091 private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; 092 private static final boolean BROKEN_FILE_LOCK; 093 private static final boolean DISABLE_LOCKING; 094 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 095 private AsyncDataManager asyncDataManager; 096 private ReferenceStoreAdapter referenceStoreAdapter; 097 private TaskRunnerFactory taskRunnerFactory; 098 private WireFormat wireFormat = new OpenWireFormat(); 099 private SystemUsage usageManager; 100 private long checkpointInterval = 1000 * 20; 101 private int maxCheckpointMessageAddSize = 1024 * 4; 102 private final AMQTransactionStore transactionStore = new AMQTransactionStore(this); 103 private TaskRunner checkpointTask; 104 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 105 private final AtomicBoolean started = new AtomicBoolean(false); 106 private Runnable periodicCheckpointTask; 107 private Runnable periodicCleanupTask; 108 private boolean deleteAllMessages; 109 private boolean syncOnWrite; 110 private boolean syncOnTransaction=true; 111 private String brokerName = ""; 112 private File directory; 113 private File directoryArchive; 114 private BrokerService brokerService; 115 private final AtomicLong storeSize = new AtomicLong(); 116 private boolean persistentIndex=true; 117 private boolean useNio = true; 118 private boolean archiveDataLogs=false; 119 private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; 120 private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; 121 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 122 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 123 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 124 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 125 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 126 private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; 127 private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> (); 128 private RandomAccessFile lockFile; 129 private FileLock lock; 130 private boolean disableLocking = DISABLE_LOCKING; 131 private boolean failIfJournalIsLocked; 132 private boolean lockLogged; 133 private boolean lockAquired; 134 private boolean recoverReferenceStore=true; 135 private boolean forceRecoverReferenceStore=false; 136 private boolean useDedicatedTaskRunner=false; 137 private int journalThreadPriority = Thread.MAX_PRIORITY; 138 139 public String getBrokerName() { 140 return this.brokerName; 141 } 142 143 public void setBrokerName(String brokerName) { 144 this.brokerName = brokerName; 145 if (this.referenceStoreAdapter != null) { 146 this.referenceStoreAdapter.setBrokerName(brokerName); 147 } 148 } 149 150 public BrokerService getBrokerService() { 151 return brokerService; 152 } 153 154 public void setBrokerService(BrokerService brokerService) { 155 this.brokerService = brokerService; 156 } 157 158 public synchronized void start() throws Exception { 159 if (!started.compareAndSet(false, true)) { 160 return; 161 } 162 if (this.directory == null) { 163 if (brokerService != null) { 164 this.directory = brokerService.getBrokerDataDirectory(); 165 166 } else { 167 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); 168 this.directory = new File(directory, "amqstore"); 169 directory.getAbsolutePath(); 170 } 171 } 172 if (this.directoryArchive == null) { 173 this.directoryArchive = new File(this.directory,"archive"); 174 } 175 if (this.brokerService != null) { 176 this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory(); 177 this.scheduler = this.brokerService.getScheduler(); 178 } else { 179 this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(), 180 true, 1000, isUseDedicatedTaskRunner()); 181 this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler"); 182 } 183 184 IOHelper.mkdirs(this.directory); 185 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); 186 lock(); 187 LOG.info("AMQStore starting using directory: " + directory); 188 if (archiveDataLogs) { 189 IOHelper.mkdirs(this.directoryArchive); 190 } 191 192 if (this.usageManager != null) { 193 this.usageManager.getMemoryUsage().addUsageListener(this); 194 } 195 if (asyncDataManager == null) { 196 asyncDataManager = createAsyncDataManager(); 197 } 198 if (referenceStoreAdapter == null) { 199 referenceStoreAdapter = createReferenceStoreAdapter(); 200 } 201 referenceStoreAdapter.setDirectory(new File(directory, "kr-store")); 202 referenceStoreAdapter.setBrokerName(getBrokerName()); 203 referenceStoreAdapter.setUsageManager(usageManager); 204 referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); 205 206 if (failIfJournalIsLocked) { 207 asyncDataManager.lock(); 208 } else { 209 while (true) { 210 try { 211 asyncDataManager.lock(); 212 break; 213 } catch (IOException e) { 214 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e); 215 try { 216 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 217 } catch (InterruptedException e1) { 218 } 219 } 220 } 221 } 222 223 asyncDataManager.start(); 224 if (deleteAllMessages) { 225 asyncDataManager.delete(); 226 try { 227 JournalTrace trace = new JournalTrace(); 228 trace.setMessage("DELETED " + new Date()); 229 Location location = asyncDataManager.write(wireFormat.marshal(trace), false); 230 asyncDataManager.setMark(location, true); 231 LOG.info("Journal deleted: "); 232 deleteAllMessages = false; 233 } catch (IOException e) { 234 throw e; 235 } catch (Throwable e) { 236 throw IOExceptionSupport.create(e); 237 } 238 referenceStoreAdapter.deleteAllMessages(); 239 } 240 referenceStoreAdapter.start(); 241 Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse(); 242 LOG.info("Active data files: " + files); 243 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 244 245 public boolean iterate() { 246 doCheckpoint(); 247 return false; 248 } 249 }, "ActiveMQ Journal Checkpoint Worker"); 250 createTransactionStore(); 251 252 // 253 // The following was attempting to reduce startup times by avoiding the 254 // log 255 // file scanning that recovery performs. The problem with it is that XA 256 // transactions 257 // only live in transaction log and are not stored in the reference 258 // store, but they still 259 // need to be recovered when the broker starts up. 260 261 if (isForceRecoverReferenceStore() 262 || (isRecoverReferenceStore() && !referenceStoreAdapter 263 .isStoreValid())) { 264 LOG.warn("The ReferenceStore is not valid - recovering ..."); 265 recover(); 266 LOG.info("Finished recovering the ReferenceStore"); 267 } else { 268 Location location = writeTraceMessage("RECOVERED " + new Date(), 269 true); 270 asyncDataManager.setMark(location, true); 271 // recover transactions 272 getTransactionStore().setPreparedTransactions( 273 referenceStoreAdapter.retrievePreparedState()); 274 } 275 276 // Do a checkpoint periodically. 277 periodicCheckpointTask = new Runnable() { 278 279 public void run() { 280 checkpoint(false); 281 } 282 }; 283 scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval()); 284 periodicCleanupTask = new Runnable() { 285 286 public void run() { 287 cleanup(); 288 } 289 }; 290 scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval()); 291 292 if (lockAquired && lockLogged) { 293 LOG.info("Aquired lock for AMQ Store" + getDirectory()); 294 if (brokerService != null) { 295 brokerService.getBroker().nowMasterBroker(); 296 } 297 } 298 299 } 300 301 public void stop() throws Exception { 302 303 if (!started.compareAndSet(true, false)) { 304 return; 305 } 306 unlock(); 307 if (lockFile != null) { 308 lockFile.close(); 309 lockFile = null; 310 } 311 this.usageManager.getMemoryUsage().removeUsageListener(this); 312 synchronized (this) { 313 scheduler.cancel(periodicCheckpointTask); 314 scheduler.cancel(periodicCleanupTask); 315 } 316 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 317 while (queueIterator.hasNext()) { 318 AMQMessageStore ms = queueIterator.next(); 319 ms.stop(); 320 } 321 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 322 while (topicIterator.hasNext()) { 323 final AMQTopicMessageStore ms = topicIterator.next(); 324 ms.stop(); 325 } 326 // Take one final checkpoint and stop checkpoint processing. 327 checkpoint(true); 328 synchronized (this) { 329 checkpointTask.shutdown(); 330 } 331 referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions()); 332 queues.clear(); 333 topics.clear(); 334 IOException firstException = null; 335 referenceStoreAdapter.stop(); 336 referenceStoreAdapter = null; 337 338 if (this.brokerService == null) { 339 this.taskRunnerFactory.shutdown(); 340 this.scheduler.stop(); 341 } 342 try { 343 LOG.debug("Journal close"); 344 asyncDataManager.close(); 345 } catch (Exception e) { 346 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 347 } 348 if (firstException != null) { 349 throw firstException; 350 } 351 } 352 353 /** 354 * When we checkpoint we move all the journalled data to long term storage. 355 * 356 * @param sync 357 */ 358 public void checkpoint(boolean sync) { 359 try { 360 if (asyncDataManager == null) { 361 throw new IllegalStateException("Journal is closed."); 362 } 363 CountDownLatch latch = null; 364 synchronized (this) { 365 latch = nextCheckpointCountDownLatch; 366 checkpointTask.wakeup(); 367 } 368 if (sync) { 369 if (LOG.isDebugEnabled()) { 370 LOG.debug("Waitng for checkpoint to complete."); 371 } 372 latch.await(); 373 } 374 referenceStoreAdapter.checkpoint(sync); 375 } catch (InterruptedException e) { 376 Thread.currentThread().interrupt(); 377 LOG.warn("Request to start checkpoint failed: " + e, e); 378 } catch (IOException e) { 379 LOG.error("checkpoint failed: " + e, e); 380 } 381 } 382 383 /** 384 * This does the actual checkpoint. 385 * 386 * @return true if successful 387 */ 388 public boolean doCheckpoint() { 389 CountDownLatch latch = null; 390 synchronized (this) { 391 latch = nextCheckpointCountDownLatch; 392 nextCheckpointCountDownLatch = new CountDownLatch(1); 393 } 394 try { 395 if (LOG.isDebugEnabled()) { 396 LOG.debug("Checkpoint started."); 397 } 398 399 Location currentMark = asyncDataManager.getMark(); 400 Location newMark = currentMark; 401 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 402 while (queueIterator.hasNext()) { 403 final AMQMessageStore ms = queueIterator.next(); 404 Location mark = ms.getMark(); 405 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 406 newMark = mark; 407 } 408 } 409 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 410 while (topicIterator.hasNext()) { 411 final AMQTopicMessageStore ms = topicIterator.next(); 412 Location mark = ms.getMark(); 413 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 414 newMark = mark; 415 } 416 } 417 try { 418 if (newMark != currentMark) { 419 if (LOG.isDebugEnabled()) { 420 LOG.debug("Marking journal at: " + newMark); 421 } 422 asyncDataManager.setMark(newMark, false); 423 writeTraceMessage("CHECKPOINT " + new Date(), true); 424 } 425 } catch (Exception e) { 426 LOG.error("Failed to mark the Journal: " + e, e); 427 } 428 if (LOG.isDebugEnabled()) { 429 LOG.debug("Checkpoint done."); 430 } 431 } finally { 432 latch.countDown(); 433 } 434 return true; 435 } 436 437 /** 438 * Cleans up the data files 439 * @throws IOException 440 */ 441 public void cleanup() { 442 try { 443 Set<Integer>inProgress = new HashSet<Integer>(); 444 if (LOG.isDebugEnabled()) { 445 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values()); 446 } 447 for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) { 448 inProgress.addAll(set.keySet()); 449 } 450 Integer lastDataFile = asyncDataManager.getCurrentDataFileId(); 451 inProgress.add(lastDataFile); 452 lastDataFile = asyncDataManager.getMark().getDataFileId(); 453 inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse()); 454 Location lastActiveTx = transactionStore.checkpoint(); 455 if (lastActiveTx != null) { 456 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId()); 457 } 458 LOG.debug("lastDataFile: " + lastDataFile); 459 asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1); 460 } catch (IOException e) { 461 LOG.error("Could not cleanup data files: " + e, e); 462 } 463 } 464 465 public Set<ActiveMQDestination> getDestinations() { 466 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations()); 467 destinations.addAll(queues.keySet()); 468 destinations.addAll(topics.keySet()); 469 return destinations; 470 } 471 472 MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 473 if (destination.isQueue()) { 474 return createQueueMessageStore((ActiveMQQueue)destination); 475 } else { 476 return createTopicMessageStore((ActiveMQTopic)destination); 477 } 478 } 479 480 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 481 AMQMessageStore store = queues.get(destination); 482 if (store == null) { 483 ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); 484 store = new AMQMessageStore(this, checkpointStore, destination); 485 try { 486 store.start(); 487 } catch (Exception e) { 488 throw IOExceptionSupport.create(e); 489 } 490 queues.put(destination, store); 491 } 492 return store; 493 } 494 495 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 496 AMQTopicMessageStore store = topics.get(destinationName); 497 if (store == null) { 498 TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); 499 store = new AMQTopicMessageStore(this,checkpointStore, destinationName); 500 try { 501 store.start(); 502 } catch (Exception e) { 503 throw IOExceptionSupport.create(e); 504 } 505 topics.put(destinationName, store); 506 } 507 return store; 508 } 509 510 /** 511 * Cleanup method to remove any state associated with the given destination 512 * 513 * @param destination 514 */ 515 public void removeQueueMessageStore(ActiveMQQueue destination) { 516 AMQMessageStore store= queues.remove(destination); 517 referenceStoreAdapter.removeQueueMessageStore(destination); 518 } 519 520 /** 521 * Cleanup method to remove any state associated with the given destination 522 * 523 * @param destination 524 */ 525 public void removeTopicMessageStore(ActiveMQTopic destination) { 526 topics.remove(destination); 527 } 528 529 public TransactionStore createTransactionStore() throws IOException { 530 return transactionStore; 531 } 532 533 public long getLastMessageBrokerSequenceId() throws IOException { 534 return referenceStoreAdapter.getLastMessageBrokerSequenceId(); 535 } 536 537 public void beginTransaction(ConnectionContext context) throws IOException { 538 referenceStoreAdapter.beginTransaction(context); 539 } 540 541 public void commitTransaction(ConnectionContext context) throws IOException { 542 referenceStoreAdapter.commitTransaction(context); 543 } 544 545 public void rollbackTransaction(ConnectionContext context) throws IOException { 546 referenceStoreAdapter.rollbackTransaction(context); 547 } 548 549 public boolean isPersistentIndex() { 550 return persistentIndex; 551 } 552 553 public void setPersistentIndex(boolean persistentIndex) { 554 this.persistentIndex = persistentIndex; 555 } 556 557 /** 558 * @param location 559 * @return 560 * @throws IOException 561 */ 562 public DataStructure readCommand(Location location) throws IOException { 563 try { 564 ByteSequence packet = asyncDataManager.read(location); 565 return (DataStructure)wireFormat.unmarshal(packet); 566 } catch (IOException e) { 567 throw createReadException(location, e); 568 } 569 } 570 571 /** 572 * Move all the messages that were in the journal into long term storage. We 573 * just replay and do a checkpoint. 574 * 575 * @throws IOException 576 * @throws IOException 577 * @throws IllegalStateException 578 */ 579 private void recover() throws IllegalStateException, IOException { 580 referenceStoreAdapter.clearMessages(); 581 Location pos = null; 582 int redoCounter = 0; 583 LOG.info("Journal Recovery Started from: " + asyncDataManager); 584 long start = System.currentTimeMillis(); 585 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 586 // While we have records in the journal. 587 while ((pos = asyncDataManager.getNextLocation(pos)) != null) { 588 ByteSequence data = asyncDataManager.read(pos); 589 DataStructure c = (DataStructure)wireFormat.unmarshal(data); 590 if (c instanceof Message) { 591 Message message = (Message)c; 592 AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination()); 593 if (message.isInTransaction()) { 594 transactionStore.addMessage(store, message, pos); 595 } else { 596 if (store.replayAddMessage(context, message, pos)) { 597 redoCounter++; 598 } 599 } 600 } else { 601 switch (c.getDataStructureType()) { 602 case SubscriptionInfo.DATA_STRUCTURE_TYPE: { 603 referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c); 604 } 605 break; 606 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 607 JournalQueueAck command = (JournalQueueAck)c; 608 AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination()); 609 if (command.getMessageAck().isInTransaction()) { 610 transactionStore.removeMessage(store, command.getMessageAck(), pos); 611 } else { 612 if (store.replayRemoveMessage(context, command.getMessageAck())) { 613 redoCounter++; 614 } 615 } 616 } 617 break; 618 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 619 JournalTopicAck command = (JournalTopicAck)c; 620 AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination()); 621 if (command.getTransactionId() != null) { 622 transactionStore.acknowledge(store, command, pos); 623 } else { 624 if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) { 625 redoCounter++; 626 } 627 } 628 } 629 break; 630 case JournalTransaction.DATA_STRUCTURE_TYPE: { 631 JournalTransaction command = (JournalTransaction)c; 632 try { 633 // Try to replay the packet. 634 switch (command.getType()) { 635 case JournalTransaction.XA_PREPARE: 636 transactionStore.replayPrepare(command.getTransactionId()); 637 break; 638 case JournalTransaction.XA_COMMIT: 639 case JournalTransaction.LOCAL_COMMIT: 640 AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 641 if (tx == null) { 642 break; // We may be trying to replay a commit 643 } 644 // that 645 // was already committed. 646 // Replay the committed operations. 647 tx.getOperations(); 648 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 649 AMQTxOperation op = (AMQTxOperation)iter.next(); 650 if (op.replay(this, context)) { 651 redoCounter++; 652 } 653 } 654 break; 655 case JournalTransaction.LOCAL_ROLLBACK: 656 case JournalTransaction.XA_ROLLBACK: 657 transactionStore.replayRollback(command.getTransactionId()); 658 break; 659 default: 660 throw new IOException("Invalid journal command type: " + command.getType()); 661 } 662 } catch (IOException e) { 663 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 664 } 665 } 666 break; 667 case JournalTrace.DATA_STRUCTURE_TYPE: 668 JournalTrace trace = (JournalTrace)c; 669 LOG.debug("TRACE Entry: " + trace.getMessage()); 670 break; 671 default: 672 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 673 } 674 } 675 } 676 Location location = writeTraceMessage("RECOVERED " + new Date(), true); 677 asyncDataManager.setMark(location, true); 678 long end = System.currentTimeMillis(); 679 LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); 680 } 681 682 private IOException createReadException(Location location, Exception e) { 683 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 684 } 685 686 protected IOException createWriteException(DataStructure packet, Exception e) { 687 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 688 } 689 690 protected IOException createWriteException(String command, Exception e) { 691 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 692 } 693 694 protected IOException createRecoveryFailedException(Exception e) { 695 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 696 } 697 698 /** 699 * @param command 700 * @param syncHint 701 * @return 702 * @throws IOException 703 */ 704 public Location writeCommand(DataStructure command, boolean syncHint) throws IOException { 705 return writeCommand(command, syncHint,false); 706 } 707 708 public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException { 709 try { 710 return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite))); 711 } catch (IOException ioe) { 712 LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe); 713 brokerService.handleIOException(ioe); 714 throw ioe; 715 } 716 } 717 718 private Location writeTraceMessage(String message, boolean sync) throws IOException { 719 JournalTrace trace = new JournalTrace(); 720 trace.setMessage(message); 721 return writeCommand(trace, sync); 722 } 723 724 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 725 newPercentUsage = (newPercentUsage / 10) * 10; 726 oldPercentUsage = (oldPercentUsage / 10) * 10; 727 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 728 checkpoint(false); 729 } 730 } 731 732 public AMQTransactionStore getTransactionStore() { 733 return transactionStore; 734 } 735 736 public synchronized void deleteAllMessages() throws IOException { 737 deleteAllMessages = true; 738 } 739 740 @Override 741 public String toString() { 742 return "AMQPersistenceAdapter(" + directory + ")"; 743 } 744 745 // ///////////////////////////////////////////////////////////////// 746 // Subclass overridables 747 // ///////////////////////////////////////////////////////////////// 748 protected AsyncDataManager createAsyncDataManager() { 749 AsyncDataManager manager = new AsyncDataManager(storeSize); 750 manager.setDirectory(new File(directory, "journal")); 751 manager.setDirectoryArchive(getDirectoryArchive()); 752 manager.setArchiveDataLogs(isArchiveDataLogs()); 753 manager.setMaxFileLength(maxFileLength); 754 manager.setUseNio(useNio); 755 return manager; 756 } 757 758 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { 759 KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize); 760 adaptor.setPersistentIndex(isPersistentIndex()); 761 adaptor.setIndexBinSize(getIndexBinSize()); 762 adaptor.setIndexKeySize(getIndexKeySize()); 763 adaptor.setIndexPageSize(getIndexPageSize()); 764 adaptor.setIndexMaxBinSize(getIndexMaxBinSize()); 765 adaptor.setIndexLoadFactor(getIndexLoadFactor()); 766 return adaptor; 767 } 768 769 // ///////////////////////////////////////////////////////////////// 770 // Property Accessors 771 // ///////////////////////////////////////////////////////////////// 772 public AsyncDataManager getAsyncDataManager() { 773 return asyncDataManager; 774 } 775 776 public void setAsyncDataManager(AsyncDataManager asyncDataManager) { 777 this.asyncDataManager = asyncDataManager; 778 } 779 780 public ReferenceStoreAdapter getReferenceStoreAdapter() { 781 return referenceStoreAdapter; 782 } 783 784 public TaskRunnerFactory getTaskRunnerFactory() { 785 return taskRunnerFactory; 786 } 787 788 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 789 this.taskRunnerFactory = taskRunnerFactory; 790 } 791 792 /** 793 * @return Returns the wireFormat. 794 */ 795 public WireFormat getWireFormat() { 796 return wireFormat; 797 } 798 799 public void setWireFormat(WireFormat wireFormat) { 800 this.wireFormat = wireFormat; 801 } 802 803 public SystemUsage getUsageManager() { 804 return usageManager; 805 } 806 807 public void setUsageManager(SystemUsage usageManager) { 808 this.usageManager = usageManager; 809 } 810 811 public int getMaxCheckpointMessageAddSize() { 812 return maxCheckpointMessageAddSize; 813 } 814 815 /** 816 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 817 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 818 */ 819 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 820 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 821 } 822 823 824 public synchronized File getDirectory() { 825 return directory; 826 } 827 828 public synchronized void setDirectory(File directory) { 829 this.directory = directory; 830 } 831 832 public boolean isSyncOnWrite() { 833 return this.syncOnWrite; 834 } 835 836 public void setSyncOnWrite(boolean syncOnWrite) { 837 this.syncOnWrite = syncOnWrite; 838 } 839 840 public boolean isSyncOnTransaction() { 841 return syncOnTransaction; 842 } 843 844 public void setSyncOnTransaction(boolean syncOnTransaction) { 845 this.syncOnTransaction = syncOnTransaction; 846 } 847 848 /** 849 * @param referenceStoreAdapter the referenceStoreAdapter to set 850 */ 851 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { 852 this.referenceStoreAdapter = referenceStoreAdapter; 853 } 854 855 public long size(){ 856 return storeSize.get(); 857 } 858 859 public boolean isUseNio() { 860 return useNio; 861 } 862 863 public void setUseNio(boolean useNio) { 864 this.useNio = useNio; 865 } 866 867 public int getMaxFileLength() { 868 return maxFileLength; 869 } 870 871 /** 872 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 873 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 874 */ 875 public void setMaxFileLength(int maxFileLength) { 876 this.maxFileLength = maxFileLength; 877 } 878 879 public long getCleanupInterval() { 880 return cleanupInterval; 881 } 882 883 public void setCleanupInterval(long cleanupInterval) { 884 this.cleanupInterval = cleanupInterval; 885 } 886 887 public long getCheckpointInterval() { 888 return checkpointInterval; 889 } 890 891 public void setCheckpointInterval(long checkpointInterval) { 892 this.checkpointInterval = checkpointInterval; 893 } 894 895 public int getIndexBinSize() { 896 return indexBinSize; 897 } 898 899 public void setIndexBinSize(int indexBinSize) { 900 this.indexBinSize = indexBinSize; 901 } 902 903 public int getIndexKeySize() { 904 return indexKeySize; 905 } 906 907 public void setIndexKeySize(int indexKeySize) { 908 this.indexKeySize = indexKeySize; 909 } 910 911 public int getIndexPageSize() { 912 return indexPageSize; 913 } 914 915 public int getIndexMaxBinSize() { 916 return indexMaxBinSize; 917 } 918 919 public void setIndexMaxBinSize(int maxBinSize) { 920 this.indexMaxBinSize = maxBinSize; 921 } 922 923 /** 924 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 925 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 926 */ 927 public void setIndexPageSize(int indexPageSize) { 928 this.indexPageSize = indexPageSize; 929 } 930 931 public void setIndexLoadFactor(int factor){ 932 this.indexLoadFactor=factor; 933 } 934 935 public int getIndexLoadFactor(){ 936 return this.indexLoadFactor; 937 } 938 939 public int getMaxReferenceFileLength() { 940 return maxReferenceFileLength; 941 } 942 943 /** 944 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 945 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 946 */ 947 public void setMaxReferenceFileLength(int maxReferenceFileLength) { 948 this.maxReferenceFileLength = maxReferenceFileLength; 949 } 950 951 public File getDirectoryArchive() { 952 return directoryArchive; 953 } 954 955 public void setDirectoryArchive(File directoryArchive) { 956 this.directoryArchive = directoryArchive; 957 } 958 959 public boolean isArchiveDataLogs() { 960 return archiveDataLogs; 961 } 962 963 public void setArchiveDataLogs(boolean archiveDataLogs) { 964 this.archiveDataLogs = archiveDataLogs; 965 } 966 967 public boolean isDisableLocking() { 968 return disableLocking; 969 } 970 971 public void setDisableLocking(boolean disableLocking) { 972 this.disableLocking = disableLocking; 973 } 974 975 /** 976 * @return the recoverReferenceStore 977 */ 978 public boolean isRecoverReferenceStore() { 979 return recoverReferenceStore; 980 } 981 982 /** 983 * @param recoverReferenceStore the recoverReferenceStore to set 984 */ 985 public void setRecoverReferenceStore(boolean recoverReferenceStore) { 986 this.recoverReferenceStore = recoverReferenceStore; 987 } 988 989 /** 990 * @return the forceRecoverReferenceStore 991 */ 992 public boolean isForceRecoverReferenceStore() { 993 return forceRecoverReferenceStore; 994 } 995 996 /** 997 * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set 998 */ 999 public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) { 1000 this.forceRecoverReferenceStore = forceRecoverReferenceStore; 1001 } 1002 1003 public boolean isUseDedicatedTaskRunner() { 1004 return useDedicatedTaskRunner; 1005 } 1006 1007 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1008 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1009 } 1010 1011 /** 1012 * @return the journalThreadPriority 1013 */ 1014 public int getJournalThreadPriority() { 1015 return this.journalThreadPriority; 1016 } 1017 1018 /** 1019 * @param journalThreadPriority the journalThreadPriority to set 1020 */ 1021 public void setJournalThreadPriority(int journalThreadPriority) { 1022 this.journalThreadPriority = journalThreadPriority; 1023 } 1024 1025 1026 protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { 1027 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 1028 if (map == null) { 1029 map = new ConcurrentHashMap<Integer, AtomicInteger>(); 1030 dataFilesInProgress.put(store, map); 1031 } 1032 AtomicInteger count = map.get(dataFileId); 1033 if (count == null) { 1034 count = new AtomicInteger(0); 1035 map.put(dataFileId, count); 1036 } 1037 count.incrementAndGet(); 1038 } 1039 1040 protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) { 1041 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 1042 if (map != null) { 1043 AtomicInteger count = map.get(dataFileId); 1044 if (count != null) { 1045 int newCount = count.decrementAndGet(); 1046 if (newCount <=0) { 1047 map.remove(dataFileId); 1048 } 1049 } 1050 if (map.isEmpty()) { 1051 dataFilesInProgress.remove(store); 1052 } 1053 } 1054 } 1055 1056 1057 protected void lock() throws Exception { 1058 lockLogged = false; 1059 lockAquired = false; 1060 do { 1061 if (doLock()) { 1062 lockAquired = true; 1063 } else { 1064 if (!lockLogged) { 1065 LOG.warn("Waiting to Lock the Store " + getDirectory()); 1066 lockLogged = true; 1067 } 1068 Thread.sleep(1000); 1069 } 1070 1071 } while (!lockAquired && !disableLocking); 1072 } 1073 1074 private synchronized void unlock() throws IOException { 1075 if (!disableLocking && (null != lock)) { 1076 //clear property doesn't work on some platforms 1077 System.getProperties().remove(getPropertyKey()); 1078 System.clearProperty(getPropertyKey()); 1079 assert(System.getProperty(getPropertyKey())==null); 1080 if (lock.isValid()) { 1081 lock.release(); 1082 lock.channel().close(); 1083 1084 } 1085 lock = null; 1086 } 1087 } 1088 1089 1090 protected boolean doLock() throws IOException { 1091 boolean result = true; 1092 if (!disableLocking && directory != null && lock == null) { 1093 String key = getPropertyKey(); 1094 String property = System.getProperty(key); 1095 if (null == property) { 1096 if (!BROKEN_FILE_LOCK) { 1097 lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false); 1098 if (lock == null) { 1099 result = false; 1100 } else { 1101 System.setProperty(key, new Date().toString()); 1102 } 1103 } 1104 } else { // already locked 1105 result = false; 1106 } 1107 } 1108 return result; 1109 } 1110 1111 private String getPropertyKey() throws IOException { 1112 return getClass().getName() + ".lock." + directory.getCanonicalPath(); 1113 } 1114 1115 static { 1116 BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX 1117 + ".FileLockBroken", 1118 "false")); 1119 DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX 1120 + ".DisableLocking", 1121 "false")); 1122 } 1123 1124 1125 public long getLastProducerSequenceId(ProducerId id) { 1126 // reference store send has adequate duplicate suppression 1127 return -1; 1128 } 1129 }