Class EventHandlingProcessor
java.lang.Object
com.opencqrs.framework.eventhandler.EventHandlingProcessor
- All Implemented Interfaces:
Runnable
Asynchronous event processor
observing an event stream to be
handled by matching
EventHandlerDefinition
s all belonging to the same
processing group and partition with configurable
progress tracking and retry in case of errors.- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionEventHandlingProcessor
(long partition, String subject, Boolean recursive, EventReader eventReader, ProgressTracker progressTracker, EventSequenceResolver eventSequenceResolver, PartitionKeyResolver partitionKeyResolver, List<EventHandlerDefinition> eventHandlerDefinitions, BackOff backoff) Creates a pre-configured instance ofthis
. -
Method Summary
Modifier and TypeMethodDescriptionlong
void
run()
Enters the event processing loop, running infinitely unless interrupted orCqrsFrameworkException.NonTransientException
is thrown.Future
<?> start()
void
stop()
-
Constructor Details
-
EventHandlingProcessor
public EventHandlingProcessor(long partition, String subject, Boolean recursive, EventReader eventReader, ProgressTracker progressTracker, EventSequenceResolver eventSequenceResolver, PartitionKeyResolver partitionKeyResolver, List<EventHandlerDefinition> eventHandlerDefinitions, BackOff backoff) Creates a pre-configured instance ofthis
.- Parameters:
partition
- the partition number handled bythis
with respect to the processing groupsubject
- the subject to observerecursive
- whether the subject should be observed recursively, that is including child subjectseventReader
- the event sourceprogressTracker
- the progress tracker to maintain the progress within the observed event streameventSequenceResolver
- the event sequence resolver to determine the event sequence idpartitionKeyResolver
- the partition key resolver to determine if the event needs to be handledthis
eventHandlerDefinitions
- a list ofEventHandlerDefinition
to dispatch events tobackoff
- a configurable back-off strategy for retryable errors
-
-
Method Details
-
getPartition
public long getPartition() -
getGroupId
-
run
public void run()Enters the event processing loop, running infinitely unless interrupted orCqrsFrameworkException.NonTransientException
is thrown. This involves:- fetching the ProgressTracker.current(String, long) current progress} for the configured processing group and partition
- observing the event stream for the configured subject starting from the current progress
- checking if the raw event's sequence id is relevant for this partition, otherwise skip it
- upcasting any observed event
- resolving the Java event type
- converting the upcasted event to a Java object
- checking if the converted event's sequence id is relevant for this partition, otherwise skip it
- passing the event (and associated information) to each matching
EventHandler
- ProgressTracker.proceed(String, long, Supplier) proceeding the progress} of the event handling loop iteration (also for non-relevant events previously skipped)
CqrsFrameworkException.NonTransientException
s thrown by any matchingEventHandler
won't be retried and will terminate the processing loop unrecoverably- any other
Throwable
thrown by any matchingEventHandler
is subject to retry CqrsFrameworkException.TransientException
s thrown by framework components are subject to retry- any thread interruption before or after calling the
EventHandler
will terminate the processing loop, assumingthis
was stopped - any other
Throwable
thrown by any of the framework components won't be retried and will terminate the processing loop unrecoverably
EventHandler
s or framework components will cause the event processor toback off
from the event processing loop,waiting
before retrying the failed event according to the aforementioned event processing loop. Once theback off
is exhausted the erroneous event will be skipped, continuing with the next observable event, once available.This method is assumed to be started within a thread pool with one additional spare thread, which is used to dispatch any raw
Event
received viaEventReader.consumeRaw(EventReader.ClientRequestor, BiConsumer)
. This effectively offloads event upcasting, type resolution, deserialization, and the actual event handling from the underlyingHttpClient
HttpResponse.BodySubscriber
thread, in order to be able tostop()
this
properly. -
start
Startsthis
using aExecutors.newFixedThreadPool(int)
with size2
. The first thread within the pool is used to start the event processing loop, while the second one is used for the rawEvent
dispatching.- Returns:
- a future for the event processing loop to determine, when it ends (prematurely)
-
stop
public void stop()
-