Class EventHandlingProcessor
java.lang.Object
com.opencqrs.framework.eventhandler.EventHandlingProcessor
- All Implemented Interfaces:
Runnable
Asynchronous event processor
observing an event stream to be
handled by matching
EventHandlerDefinitions all belonging to the same
processing group and partition with configurable
progress tracking and retry in case of errors.- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionEventHandlingProcessor(long partition, String subject, Boolean recursive, EventReader eventReader, ProgressTracker progressTracker, EventSequenceResolver eventSequenceResolver, PartitionKeyResolver partitionKeyResolver, List<EventHandlerDefinition> eventHandlerDefinitions, BackOff backoff) Creates a pre-configured instance ofthis. -
Method Summary
Modifier and TypeMethodDescriptionlongvoidrun()Enters the event processing loop, running infinitely unless interrupted orCqrsFrameworkException.NonTransientExceptionis thrown.Future<?> start()voidstop()
-
Constructor Details
-
EventHandlingProcessor
public EventHandlingProcessor(long partition, String subject, Boolean recursive, EventReader eventReader, ProgressTracker progressTracker, EventSequenceResolver eventSequenceResolver, PartitionKeyResolver partitionKeyResolver, List<EventHandlerDefinition> eventHandlerDefinitions, BackOff backoff) Creates a pre-configured instance ofthis.- Parameters:
partition- the partition number handled bythiswith respect to the processing groupsubject- the subject to observerecursive- whether the subject should be observed recursively, that is including child subjectseventReader- the event sourceprogressTracker- the progress tracker to maintain the progress within the observed event streameventSequenceResolver- the event sequence resolver to determine the event sequence idpartitionKeyResolver- the partition key resolver to determine if the event needs to be handledthiseventHandlerDefinitions- a list ofEventHandlerDefinitionto dispatch events tobackoff- a configurable back-off strategy for retryable errors
-
-
Method Details
-
getPartition
public long getPartition() -
getGroupId
-
run
public void run()Enters the event processing loop, running infinitely unless interrupted orCqrsFrameworkException.NonTransientExceptionis thrown. This involves:- fetching the ProgressTracker.current(String, long) current progress} for the configured processing group and partition
- observing the event stream for the configured subject starting from the current progress
- checking if the raw event's sequence id is relevant for this partition, otherwise skip it
- upcasting any observed event
- resolving the Java event type
- converting the upcasted event to a Java object
- checking if the converted event's sequence id is relevant for this partition, otherwise skip it
- passing the event (and associated information) to each matching
EventHandler - ProgressTracker.proceed(String, long, Supplier) proceeding the progress} of the event handling loop iteration (also for non-relevant events previously skipped)
CqrsFrameworkException.NonTransientExceptions thrown by any matchingEventHandlerwon't be retried and will terminate the processing loop unrecoverably- any other
Throwablethrown by any matchingEventHandleris subject to retry CqrsFrameworkException.TransientExceptions thrown by framework components are subject to retry- any thread interruption before or after calling the
EventHandlerwill terminate the processing loop, assumingthiswas stopped - any other
Throwablethrown by any of the framework components won't be retried and will terminate the processing loop unrecoverably
EventHandlers or framework components will cause the event processor toback offfrom the event processing loop,waitingbefore retrying the failed event according to the aforementioned event processing loop. Once theback offis exhausted the erroneous event will be skipped, continuing with the next observable event, once available.This method is assumed to be started within a thread pool with one additional spare thread, which is used to dispatch any raw
Eventreceived viaEventReader.consumeRaw(EventReader.ClientRequestor, BiConsumer). This effectively offloads event upcasting, type resolution, deserialization, and the actual event handling from the underlyingHttpClientHttpResponse.BodySubscriberthread, in order to be able tostop()thisproperly. -
start
Startsthisusing aExecutors.newFixedThreadPool(int)with size2. The first thread within the pool is used to start the event processing loop, while the second one is used for the rawEventdispatching.- Returns:
- a future for the event processing loop to determine, when it ends (prematurely)
-
stop
public void stop()
-