org.apache.activemq.store.kahadb
Class KahaDBPersistenceAdapter

java.lang.Object
  extended by org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
All Implemented Interfaces:
BrokerServiceAware, Service, PersistenceAdapter

public class KahaDBPersistenceAdapter
extends java.lang.Object
implements PersistenceAdapter, BrokerServiceAware

An implementation of PersistenceAdapter designed for use with a Journal and then check pointing asynchronously on a timeout with some other long term persistent storage.


Constructor Summary
KahaDBPersistenceAdapter()
           
 
Method Summary
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          checkpoint any
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
          Factory method to create a new topic message store with the given destination name
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 long getCheckpointInterval()
          Get the checkpointInterval
 long getCleanupInterval()
          Get the cleanupInterval
 int getDatabaseLockedWaitDelay()
           
 java.util.Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 java.io.File getDirectory()
          Get the directory
 java.io.File getDirectoryArchive()
           
 int getFailoverProducersAuditDepth()
           
 boolean getForceRecoverIndex()
           
 int getIndexCacheSize()
          Get the indexCacheSize
 int getIndexWriteBatchSize()
          Get the indexWriteBatchSize
 int getJournalMaxFileLength()
          Get the journalMaxFileLength
 int getJournalMaxWriteBatchSize()
          Get the journalMaxWriteBatchSize
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 int getMaxAsyncJobs()
           
 int getMaxFailoverProducersToTrack()
           
 KahaDBStore getStore()
           
 boolean isArchiveDataLogs()
           
 boolean isCheckForCorruptJournalFiles()
           
 boolean isChecksumJournalFiles()
           
 boolean isConcurrentStoreAndDispatchQueues()
           
 boolean isConcurrentStoreAndDispatchTopics()
           
 boolean isEnableIndexWriteAsync()
          Get the enableIndexWriteAsync
 boolean isEnableJournalDiskSyncs()
          Get the enableJournalDiskSyncs
 boolean isIgnoreMissingJournalfiles()
          Get the ignoreMissingJournalfiles
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination.
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setArchiveDataLogs(boolean archiveDataLogs)
           
 void setBrokerName(java.lang.String brokerName)
          Set the name of the broker using the adapter
 void setBrokerService(BrokerService brokerService)
           
 void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
           
 void setCheckpointInterval(long checkpointInterval)
          Set the checkpointInterval
 void setChecksumJournalFiles(boolean checksumJournalFiles)
           
 void setCleanupInterval(long cleanupInterval)
          Set the cleanupInterval
 void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)
           
 void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)
           
 void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay)
           
 void setDirectory(java.io.File dir)
          Set the directory where any data files should be created
 void setDirectoryArchive(java.io.File directoryArchive)
           
 void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
          Set the enableIndexWriteAsync
 void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs)
          Set the enableJournalDiskSyncs
 void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
          set the audit window depth for duplicate suppression (should exceed the max transaction batch)
 void setForceRecoverIndex(boolean forceRecoverIndex)
           
 void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
          Set the ignoreMissingJournalfiles
 void setIndexCacheSize(int indexCacheSize)
          Set the indexCacheSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setIndexWriteBatchSize(int indexWriteBatchSize)
          Set the indexWriteBatchSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setJournalMaxFileLength(int journalMaxFileLength)
          When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
          Set the journalMaxWriteBatchSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
 void setMaxAsyncJobs(int maxAsyncJobs)
           
 void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
          Set the max number of producers (LRU cache) to track for duplicate sends
 void setUsageManager(SystemUsage usageManager)
           
 long size()
          A hint to return the size of the store on disk
 void start()
           
 void stop()
           
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

KahaDBPersistenceAdapter

public KahaDBPersistenceAdapter()
Method Detail

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws java.io.IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
java.io.IOException
See Also:
PersistenceAdapter.beginTransaction(org.apache.activemq.broker.ConnectionContext)

checkpoint

public void checkpoint(boolean sync)
                throws java.io.IOException
Description copied from interface: PersistenceAdapter
checkpoint any

Specified by:
checkpoint in interface PersistenceAdapter
Parameters:
sync -
Throws:
java.io.IOException
See Also:
PersistenceAdapter.checkpoint(boolean)

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws java.io.IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
java.io.IOException
See Also:
PersistenceAdapter.commitTransaction(org.apache.activemq.broker.ConnectionContext)

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws java.io.IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Parameters:
destination -
Returns:
MessageStore
Throws:
java.io.IOException
See Also:
PersistenceAdapter.createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
                                          throws java.io.IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Parameters:
destination -
Returns:
TopicMessageStore
Throws:
java.io.IOException
See Also:
PersistenceAdapter.createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws java.io.IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
TrandactionStore
Throws:
java.io.IOException
See Also:
PersistenceAdapter.createTransactionStore()

deleteAllMessages

public void deleteAllMessages()
                       throws java.io.IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
java.io.IOException
See Also:
PersistenceAdapter.deleteAllMessages()

getDestinations

public java.util.Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
destinations
See Also:
PersistenceAdapter.getDestinations()

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws java.io.IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
lastMessageBrokerSequenceId
Throws:
java.io.IOException
See Also:
PersistenceAdapter.getLastMessageBrokerSequenceId()

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
                               throws java.io.IOException
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed
Throws:
java.io.IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Description copied from interface: PersistenceAdapter
Cleanup method to remove any state associated with the given destination. This method does not stop the message store (it might not be cached).

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination -
See Also:
PersistenceAdapter.removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Description copied from interface: PersistenceAdapter
Cleanup method to remove any state associated with the given destination This method does not stop the message store (it might not be cached).

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination -
See Also:
PersistenceAdapter.removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws java.io.IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Parameters:
context -
Throws:
java.io.IOException
See Also:
PersistenceAdapter.rollbackTransaction(org.apache.activemq.broker.ConnectionContext)

setBrokerName

public void setBrokerName(java.lang.String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter
Parameters:
brokerName -
See Also:
PersistenceAdapter.setBrokerName(java.lang.String)

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager -
See Also:
PersistenceAdapter.setUsageManager(org.apache.activemq.usage.SystemUsage)

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
the size of the store
See Also:
PersistenceAdapter.size()

start

public void start()
           throws java.lang.Exception
Specified by:
start in interface Service
Throws:
java.lang.Exception
See Also:
Service.start()

stop

public void stop()
          throws java.lang.Exception
Specified by:
stop in interface Service
Throws:
java.lang.Exception
See Also:
Service.stop()

getJournalMaxFileLength

public int getJournalMaxFileLength()
Get the journalMaxFileLength

Returns:
the journalMaxFileLength

setJournalMaxFileLength

public void setJournalMaxFileLength(int journalMaxFileLength)
When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used


setMaxFailoverProducersToTrack

public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
Set the max number of producers (LRU cache) to track for duplicate sends


getMaxFailoverProducersToTrack

public int getMaxFailoverProducersToTrack()

setFailoverProducersAuditDepth

public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
set the audit window depth for duplicate suppression (should exceed the max transaction batch)


getFailoverProducersAuditDepth

public int getFailoverProducersAuditDepth()

getCheckpointInterval

public long getCheckpointInterval()
Get the checkpointInterval

Returns:
the checkpointInterval

setCheckpointInterval

public void setCheckpointInterval(long checkpointInterval)
Set the checkpointInterval

Parameters:
checkpointInterval - the checkpointInterval to set

getCleanupInterval

public long getCleanupInterval()
Get the cleanupInterval

Returns:
the cleanupInterval

setCleanupInterval

public void setCleanupInterval(long cleanupInterval)
Set the cleanupInterval

Parameters:
cleanupInterval - the cleanupInterval to set

getIndexWriteBatchSize

public int getIndexWriteBatchSize()
Get the indexWriteBatchSize

Returns:
the indexWriteBatchSize

setIndexWriteBatchSize

public void setIndexWriteBatchSize(int indexWriteBatchSize)
Set the indexWriteBatchSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
indexWriteBatchSize - the indexWriteBatchSize to set

getJournalMaxWriteBatchSize

public int getJournalMaxWriteBatchSize()
Get the journalMaxWriteBatchSize

Returns:
the journalMaxWriteBatchSize

setJournalMaxWriteBatchSize

public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
Set the journalMaxWriteBatchSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
journalMaxWriteBatchSize - the journalMaxWriteBatchSize to set

isEnableIndexWriteAsync

public boolean isEnableIndexWriteAsync()
Get the enableIndexWriteAsync

Returns:
the enableIndexWriteAsync

setEnableIndexWriteAsync

public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
Set the enableIndexWriteAsync

Parameters:
enableIndexWriteAsync - the enableIndexWriteAsync to set

getDirectory

public java.io.File getDirectory()
Get the directory

Returns:
the directory

setDirectory

public void setDirectory(java.io.File dir)
Description copied from interface: PersistenceAdapter
Set the directory where any data files should be created

Specified by:
setDirectory in interface PersistenceAdapter
Parameters:
dir -
See Also:
PersistenceAdapter.setDirectory(java.io.File)

isEnableJournalDiskSyncs

public boolean isEnableJournalDiskSyncs()
Get the enableJournalDiskSyncs

Returns:
the enableJournalDiskSyncs

setEnableJournalDiskSyncs

public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs)
Set the enableJournalDiskSyncs

Parameters:
enableJournalDiskSyncs - the enableJournalDiskSyncs to set

getIndexCacheSize

public int getIndexCacheSize()
Get the indexCacheSize

Returns:
the indexCacheSize

setIndexCacheSize

public void setIndexCacheSize(int indexCacheSize)
Set the indexCacheSize When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used

Parameters:
indexCacheSize - the indexCacheSize to set

isIgnoreMissingJournalfiles

public boolean isIgnoreMissingJournalfiles()
Get the ignoreMissingJournalfiles

Returns:
the ignoreMissingJournalfiles

setIgnoreMissingJournalfiles

public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
Set the ignoreMissingJournalfiles

Parameters:
ignoreMissingJournalfiles - the ignoreMissingJournalfiles to set

isChecksumJournalFiles

public boolean isChecksumJournalFiles()

isCheckForCorruptJournalFiles

public boolean isCheckForCorruptJournalFiles()

setChecksumJournalFiles

public void setChecksumJournalFiles(boolean checksumJournalFiles)

setCheckForCorruptJournalFiles

public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware

isArchiveDataLogs

public boolean isArchiveDataLogs()

setArchiveDataLogs

public void setArchiveDataLogs(boolean archiveDataLogs)

getDirectoryArchive

public java.io.File getDirectoryArchive()

setDirectoryArchive

public void setDirectoryArchive(java.io.File directoryArchive)

isConcurrentStoreAndDispatchQueues

public boolean isConcurrentStoreAndDispatchQueues()

setConcurrentStoreAndDispatchQueues

public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch)

isConcurrentStoreAndDispatchTopics

public boolean isConcurrentStoreAndDispatchTopics()

setConcurrentStoreAndDispatchTopics

public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch)

getMaxAsyncJobs

public int getMaxAsyncJobs()

setMaxAsyncJobs

public void setMaxAsyncJobs(int maxAsyncJobs)
Parameters:
maxAsyncJobs - the maxAsyncJobs to set

getDatabaseLockedWaitDelay

public int getDatabaseLockedWaitDelay()
Returns:
the databaseLockedWaitDelay

setDatabaseLockedWaitDelay

public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay)
Parameters:
databaseLockedWaitDelay - the databaseLockedWaitDelay to set

getForceRecoverIndex

public boolean getForceRecoverIndex()

setForceRecoverIndex

public void setForceRecoverIndex(boolean forceRecoverIndex)

getStore

public KahaDBStore getStore()

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object


Copyright © 2005-2011. All Rights Reserved.