org.apache.uima.collection.impl.cpm.engine
Class CPMEngine

java.lang.Object
  extended by java.lang.Thread
      extended by org.apache.uima.collection.impl.cpm.engine.CPMEngine
All Implemented Interfaces:
java.lang.Runnable

public class CPMEngine
extends java.lang.Thread

Responsible for creating and initializing processing threads. This instance manages the lifecycle of the CPE components. It exposes API for plugging in components programmatically instead of declaratively. Running in its own thread, this components creates seperate Processing Pipelines for Analysis Engines and Cas Consumers, launches configured CollectionReader and attaches all of those components to form a pipeline from source to sink. The Collection Reader feeds Processing Threads containing Analysis Engines, and Analysis Engines feed results of analysis to Cas Consumers.


Nested Class Summary
 
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
 
Field Summary
 CPECasPool casPool
           
protected  boolean isRunning
           
protected  boolean killed
           
 java.lang.Object lockForPause
           
protected  BoundedWorkQueue outputQueue
           
protected  boolean pause
           
protected  ProcessingUnit[] processingUnits
           
protected  boolean stopped
           
protected  BoundedWorkQueue workQueue
           
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Constructor Summary
CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData)
          Initializes Collection Processing Engine.
 
Method Summary
 void addCasProcessor(CasProcessor aCasProcessor)
          Adds a CASProcessor to the processing pipeline.
 void addCasProcessor(CasProcessor aCasProcessor, int aIndex)
          Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline
 void addStatusCallbackListener(BaseStatusCallbackListener aListener)
           
 void asynchStop()
          Deprecated.  
static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps)
          Internal use only, public for crss package access.
 void cleanup()
          Null out fields of this object.
 void deployCasProcessors()
          Starts CASProcessor containers one a time.
 void disableCasProcessor(int aCasProcessorIndex)
          Disable a CASProcessor in the processing pipeline
 void disableCasProcessor(java.lang.String aCasProcessorName)
          Disable a CASProcessor in the processing pipeline
 boolean dropCasOnException()
           
 void enableCasProcessor(java.lang.String aCasProcessorName)
          Disable a CASProcessor in the processing pipeline
 java.util.LinkedList getAllProcessingContainers()
          Returns a list of All Processing Containers.
 java.util.ArrayList getCallbackListeners()
          Returns a list of ALL callback listeners currently registered with the CPM
 CasProcessor[] getCasProcessors()
          Returns all CASProcesors in the processing pipeline
protected  CpeConfiguration getCpeConfig()
           
 java.lang.String getLastDocRepository()
           
 java.lang.String getLastProcessedDocId()
          Returns Id of the last document processed
 java.util.Properties getPerformanceTuningSettings()
           
 int getPoolSize()
           
 java.util.LinkedList getProcessingContainers()
          Returns a list of Processing Containers for Analysis Engines.
 Progress[] getProgress()
          Returns collectionReader progress.
 java.util.Map getStats()
          Returns CPE stats
 int getThreadCount()
          Returns number of processing threads
 void invalidateCASes(CAS[] aCASList)
           
 boolean isHardKilled()
          Returns if the CPE was killed hard.
 boolean isKilled()
          Returns true if this engine has been killed
 boolean isParallizable(CasProcessor aProcessor, java.lang.String aCpName)
          Determines if a given Cas Processor is parallelizable.
 boolean isPaused()
          Returns a global flag indicating if this Thread is in pause state
 boolean isPauseOnException()
          Returns if the CPM should pause when exception occurs
 boolean isRunning()
          Returns a global flag indicating if this Thread is in processing state
 void killIt()
          Kill CPM the hard way.
 void pauseIt()
          Pauses this thread
 void pipelineKilled(java.lang.String aPipelineThreadName)
          Callback method used to notify the engine when a processing pipeline is killed due to excessive errors.
 void redeployAnalysisEngine(ProcessingContainer aProcessingContainer)
          Deploys CasProcessor and associates it with a ProcessingContainer
 void releaseCASes(CAS[] aCASList)
          Releases given cases back to pool.
 void removeCasProcessor(int aCasProcessorIndex)
          Removes a CASProcessor from the processing pipeline
 void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
          Unregisters given listener from the CPM
 void resumeIt()
          Resumes this thread
 void run()
          Using given configuration creates and starts CPE processing pipeline.
 void runSingleThreaded()
          Runs the CPE in a single thread without queues.
 void setCollectionReader(BaseCollectionReader aCollectionReader)
          Sets CollectionReader to use during processing
 void setConcurrentThreadSize(int aConcurrentThreadSize)
          Defines number of threads executing the processing pipeline concurrently.
 void setInputQueueSize(int aInputQueueSize)
          Defines the size of inputQueue.
 void setNumToProcess(long aNumToProcess)
          Defines the size of the batch
 void setOutputQueueSize(int aOutputQueueSize)
          Defines the size of outputQueue.
 void setPauseOnException(boolean aPause)
          Sets a global flag to indicate to the CPM that it should pause whenever exception occurs
 void setPerformanceTuningSettings(java.util.Properties aPerformanceTuningSettings)
          Overrides the default performance tuning settings for this CPE.
 void setPoolSize(int aPoolSize)
          Defines the size of Cas Pool.
 void setProcessControllerAdapter(ProcessControllerAdapter aPca)
           
 void setStats(java.util.Map aMap)
          Plugs in a map where the engine stores perfomance info at runtime
 void stopCasProcessors(boolean kill)
          Stops All Cas Processors and optionally changes the status according to kill flag
 void stopIt()
          Stops execution of the Processing Pipeline and this thread.
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

casPool

public CPECasPool casPool

lockForPause

public final java.lang.Object lockForPause

pause

protected boolean pause

isRunning

protected volatile boolean isRunning

stopped

protected volatile boolean stopped

killed

protected volatile boolean killed

processingUnits

protected ProcessingUnit[] processingUnits

outputQueue

protected BoundedWorkQueue outputQueue

workQueue

protected BoundedWorkQueue workQueue
Constructor Detail

CPMEngine

public CPMEngine(CPMThreadGroup aThreadGroup,
                 CPEFactory aCpeFactory,
                 ProcessTrace aProcTr,
                 CheckpointData aCheckpointData)
          throws java.lang.Exception
Initializes Collection Processing Engine. Assigns this thread and all processing threads created by this component to a common Thread Group.

Parameters:
aThreadGroup - - contains all CPM related threads
aCpeFactory - - CPE factory object responsible for parsing cpe descriptor and creating components
aProcTr - - instance of the ProcessTrace where the CPM accumulates stats
aCheckpointData - - checkpoint object facillitating restart from the last known point
Throws:
java.lang.Exception
Method Detail

getProcessingContainers

public java.util.LinkedList getProcessingContainers()
Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by its own container.


getAllProcessingContainers

public java.util.LinkedList getAllProcessingContainers()
Returns a list of All Processing Containers. Each CasProcessor is managed by its own container.


getThreadCount

public int getThreadCount()
                   throws ResourceConfigurationException
Returns number of processing threads

Returns:
- number of processing threads
Throws:
ResourceConfigurationException

setStats

public void setStats(java.util.Map aMap)
Plugs in a map where the engine stores perfomance info at runtime

Parameters:
aMap - - map for runtime stats and totals

getStats

public java.util.Map getStats()
Returns CPE stats

Returns:
Map containing CPE stats

setPauseOnException

public void setPauseOnException(boolean aPause)
Sets a global flag to indicate to the CPM that it should pause whenever exception occurs

Parameters:
aPause - - true if pause is requested on exception, false otherwise

isPauseOnException

public boolean isPauseOnException()
Returns if the CPM should pause when exception occurs

Returns:
- true if the CPM pauses when exception occurs, false otherwise

setInputQueueSize

public void setInputQueueSize(int aInputQueueSize)
Defines the size of inputQueue. The queue stores this many entities read from the CollectionReader. Every processing pipeline thread will read its entities from this input queue. The CollectionReader is decoupled from the consumer of entities, and continuously replenishes the input queue.

Parameters:
aBatchSize - the size of the batch.

setOutputQueueSize

public void setOutputQueueSize(int aOutputQueueSize)
Defines the size of outputQueue. The queue stores this many entities enqueued by every processing pipeline thread.The results of analysis are dumped into this queue for consumer thread to consume its contents.

Parameters:
aBatchSize - the size of the batch.

setPoolSize

public void setPoolSize(int aPoolSize)
Defines the size of Cas Pool.

Parameters:
aPoolSize - the size of the Cas pool.

getPoolSize

public int getPoolSize()

setConcurrentThreadSize

public void setConcurrentThreadSize(int aConcurrentThreadSize)
Defines number of threads executing the processing pipeline concurrently.

Parameters:
aBatchSize - the size of the batch.

addStatusCallbackListener

public void addStatusCallbackListener(BaseStatusCallbackListener aListener)

getCallbackListeners

public java.util.ArrayList getCallbackListeners()
Returns a list of ALL callback listeners currently registered with the CPM

Returns:
-

removeStatusCallbackListener

public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Unregisters given listener from the CPM

Parameters:
aListener - - instance of BaseStatusCallbackListener to unregister

isKilled

public boolean isKilled()
Returns true if this engine has been killed

Returns:

killIt

public void killIt()
Kill CPM the hard way. None of the entities in the queues will be processed. This methof simply empties all queues and at the end adds EOFToken to the work queue so that all threads go away.


isHardKilled

public boolean isHardKilled()
Returns if the CPE was killed hard. Soft kill allows the CPE to finish processing all in-transit CASes. Hard kill causes the CPM to stop processing and to throw away all unprocessed CASes from its queues.

Returns:

asynchStop

@Deprecated
public void asynchStop()
Deprecated. 


stopIt

public void stopIt()
Stops execution of the Processing Pipeline and this thread.


isParallizable

public boolean isParallizable(CasProcessor aProcessor,
                              java.lang.String aCpName)
                       throws java.lang.Exception
Determines if a given Cas Processor is parallelizable. Remote Cas Processors are by default parallelizable. For integrated and managed the CPM consults Cas Processor's descriptor to determine if it is parallelizable.

Parameters:
aProcessor - - Cas Processor being checked
aCpName - - name of the CP
Returns:
- true if CP is parallelizable, false otherwise
Throws:
java.lang.Exception

addCasProcessor

public void addCasProcessor(CasProcessor aCasProcessor)
                     throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline. If a CasProcessor already exists and its status=DISABLED this method will re-enable the CasProcesser.

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline
Throws:
ResourceConfigurationException

addCasProcessor

public void addCasProcessor(CasProcessor aCasProcessor,
                            int aIndex)
                     throws ResourceConfigurationException
Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline
aIndex - - insertion point for a given CasProcessor
Throws:
ResourceConfigurationException

removeCasProcessor

public void removeCasProcessor(int aCasProcessorIndex)
Removes a CASProcessor from the processing pipeline

Parameters:
aCasProcessorIndex - - CasProcessor position in processing pipeline

disableCasProcessor

public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline

disableCasProcessor

public void disableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline

enableCasProcessor

public void enableCasProcessor(java.lang.String aCasProcessorName)
Disable a CASProcessor in the processing pipeline

Parameters:
aProcessor - CASProcessor to be added to the processing pipeline

getCasProcessors

public CasProcessor[] getCasProcessors()
Returns all CASProcesors in the processing pipeline


redeployAnalysisEngine

public void redeployAnalysisEngine(ProcessingContainer aProcessingContainer)
                            throws java.lang.Exception
Deploys CasProcessor and associates it with a ProcessingContainer

Parameters:
aProcessingContainer -
Throws:
java.lang.Exception

deployCasProcessors

public void deployCasProcessors()
                         throws AbortCPMException
Starts CASProcessor containers one a time. During this phase the container deploys a TAE as local,remote, or integrated CasProcessor.

Throws:
AbortCPMException

isRunning

public boolean isRunning()
Returns a global flag indicating if this Thread is in processing state


isPaused

public boolean isPaused()
Returns a global flag indicating if this Thread is in pause state


pauseIt

public void pauseIt()
Pauses this thread


resumeIt

public void resumeIt()
Resumes this thread


setCollectionReader

public void setCollectionReader(BaseCollectionReader aCollectionReader)
Sets CollectionReader to use during processing

Parameters:
BaseCollectionReader - aCollectionReader

setNumToProcess

public void setNumToProcess(long aNumToProcess)
Defines the size of the batch


getLastProcessedDocId

public java.lang.String getLastProcessedDocId()
Returns Id of the last document processed


getLastDocRepository

public java.lang.String getLastDocRepository()

pipelineKilled

public void pipelineKilled(java.lang.String aPipelineThreadName)
Callback method used to notify the engine when a processing pipeline is killed due to excessive errors. This method is only called if the processing pipeline is unable to acquire a connection to remote service and when configuration indicates 'kill-pipeline' as the action to take on excessive errors. When running with multiple pipelines, routine decrements a global pipeline counter and tests if there are no more left. When all pipelines are killed as described above, the CPM needs to terminate. Since pipelines are prematurely killed, there are artifacts (CASes) in the work queue. These must be removed from the work queue and disposed of (released) back to the CAS pool so that the Collection Reader thread properly exits.

Parameters:
aPipelineThreadName - - name of the pipeline thread exiting from its run() method

run

public void run()
Using given configuration creates and starts CPE processing pipeline. It is either single-threaded or a multi-threaded pipeline. Which is actually used depends on the configuration defined in the CPE descriptor. In multi-threaded mode, the CPE starts number of threads: 1) ArtifactProducer Thread - this is a thread containing a Collection Reader. It runs asynchronously and it fills a WorkQueue with CASes. 2) CasConsumer Thread - this is an optional thread. It is only instantiated if there Cas Consumers in the pipeline 3) Processing Threads - one or more processing threads, configured identically, that are performing analysis How many threads are started depends on configuration in CPE descriptor All threads started here are placed in a ThreadGroup. This provides a catch-all mechanism for errors that may occur in the CPM. If error is thrown, the ThreadGroup is notified. The ThreadGroup than notifies all registers listeners to give an application a chance to report the error and do necessary cleanup. This routine manages all the threads and makes sure that all of them are cleaned up before returning. The ThreadGroup must cleanup all threads under its control otherwise a memory leak occurs. Even those threads that are not started must be cleaned as they end up in the ThreadGroup when instantiated. The code uses number of state variables to make decisions during cleanup.

Specified by:
run in interface java.lang.Runnable
Overrides:
run in class java.lang.Thread

cleanup

public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.


stopCasProcessors

public void stopCasProcessors(boolean kill)
                       throws CasProcessorDeploymentException
Stops All Cas Processors and optionally changes the status according to kill flag

Parameters:
- - kill - true if CPE has been stopped before completing normally
Throws:
CasProcessorDeploymentException

getProgress

public Progress[] getProgress()
Returns collectionReader progress.


invalidateCASes

public void invalidateCASes(CAS[] aCASList)

releaseCASes

public void releaseCASes(CAS[] aCASList)
Releases given cases back to pool.

Parameters:
aCASList - - cas list to release

setPerformanceTuningSettings

public void setPerformanceTuningSettings(java.util.Properties aPerformanceTuningSettings)
Overrides the default performance tuning settings for this CPE. This affects things such as CAS sizing parameters.

Parameters:
aPerformanceTuningSettings - the new settings
See Also:
UIMAFramework#getDefaultPerformanceTuningSettings()

getPerformanceTuningSettings

public java.util.Properties getPerformanceTuningSettings()
Returns:
Returns the PerformanceTuningSettings.

setProcessControllerAdapter

public void setProcessControllerAdapter(ProcessControllerAdapter aPca)
Parameters:
aPca -

getCpeConfig

protected CpeConfiguration getCpeConfig()
                                 throws java.lang.Exception
Throws:
java.lang.Exception

dropCasOnException

public boolean dropCasOnException()

runSingleThreaded

public void runSingleThreaded()
                       throws java.lang.Exception
Runs the CPE in a single thread without queues.

Throws:
java.lang.Exception

callEntityProcessCompleteWithCAS

public static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL,
                                                    CAS cas,
                                                    EntityProcessStatus eps)
Internal use only, public for crss package access. switches class loaders and locks cas

Parameters:
statCL - status call back listener
cas - cas
eps - entity process status


Copyright © 2011. All Rights Reserved.