Source coordination is the concept of coordinating and distributing work between Data Prepper data sources in a multi-node environment. Some data sources, such as Amazon Kinesis or Amazon Simple Queue Service (Amazon SQS), handle coordination natively. Other data sources, such as OpenSearch, Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and JDBC/ODBC, do not support source coordination.
Data Prepper source coordination decides which partition of work is performed by each node in the Data Prepper cluster and prevents duplicate partitions of work.
Inspired by the Kinesis Client Library, Data Prepper utilizes a distributed store in the form of a lease to handle the distribution and deduplication of work.
Source coordination separates sources into “partitions of work.” For example, an S3 object would be a partition of work for Amazon S3, or an OpenSearch index would be a partition of work for OpenSearch.
Data Prepper takes each partition of work that is chosen by the source and creates corresponding items in the distributed store that Data Prepper uses for source coordination. Each of these items has the following standard format, which can be extended by the distributed store implementation.
|The identifier for which the Data Prepper pipeline works on this partition. By default, the
sourceIdentifier is prefixed by the sub-pipeline name, but an additional prefix can be configured with
partition_prefix in your data-prepper-config.yaml file.
|The identifier for the partition of work associated with this item. For example, for an
s3 source with scan capabilities, this identifier is the S3 bucket’s
|An identifier for the node that actively owns and is working on this partition. This ID contains the hostname of the node but is
null when this partition is not owned.
|A JSON string object representing the progress made on a partition of work or any additional metadata that may be needed by the source in the case of another node resuming where the last node stopped during a crash.
|Whenever a Data Prepper node acquires a partition, a 10-minute timeout is given to the owner of the partition to handle the event of a node crashing. The ownership is renewed with another 10 minutes when the owner saves the state of the partition.
|Represents the current state of the partition:
ASSIGNED means the partition is currently being processed,
UNASSIGNED means the partition is waiting to be processed,
CLOSED means the partition is waiting to be processed at a later date, and
COMPLETED means the partition has already been processed.
|Represents the time at which CLOSED partitions reopen and are considered to be available for processing. Only applies to CLOSED partitions.
|Tracks how many times the partition has been marked as
Partitions are acquired in the order that they are returned in the
List<PartitionIdentifer> provided by the source. When a node attempts to acquire a partition, Data Prepper performs the following steps:
- Data Prepper queries the
ASSIGNEDpartitions to check whether any
ASSIGNEDpartitions have expired partition owners. This is intended to assign priority to partitions that have had nodes crash in the middle of processing, which can allow for using a partition state that may be time sensitive.
- After querying
ASSIGNEDpartitions, Data Prepper queries the
CLOSEDpartitions to determine whether any of the partition’s
reOpenAttimestamps have been reached.
- If there are no
CLOSEDpartitions available, then Data Prepper queries the
UNASSIGNEDpartitions until on of these partitions is
If this flow occurs and no partition is acquired by the node, then the partition supplier function provided in the
getNextPartition method of
SourceCoordinator will create new partitions. After the supplier function completes, Data Prepper again queries the partitions for
Any function that is passed to the
getNextPartition method creates new partitions with a global state of
Map<String, Object>. This state is shared between all of the nodes in the cluster and will only be run by a single node at a time, as determined by the source.
The following table provide optional configuration values for
|A prefix to the
sourceIdentifier used to differentiate between Data Prepper clusters that share the same distributed store.
|The object that comprises the configuration for the store to be used, where the key is the name of the store, such as
dynamodb, and the value is any configuration available on that store type.
As of Data Prepper 2.4, only
dynamodb stores are supported:
in_memorystore is the default when no
source_coordinationsettings are configured in the
data-prepper-config.yamlfile and should only be used for single-node configurations.
dynamodbstore is used for multi-node Data Prepper environments. The
dynamodbstore can be shared between one or more Data Prepper clusters that need to utilize source coordination.
Data Prepper will attempt to create the
dynamodb table on startup unless the
skip_table_creation flag is configured to
true. Optionally, you can configure the time-to-live (
ttl) on the table, which results in the store cleaning up items over time. Some sources rely on source coordination for the deduplication of data, so be sure to configure a large enough
ttl for the pipeline duration.
ttl is not configured on the table, any items no longer needed in the table must be cleaned manually.
The following shows the full set of permissions needed for Data Prepper to create the table, enable
ttl, and interact with the table:
|The name of the table to be used for source coordination.
|The region of the DynamoDB table.
sts role that contains the table permissions. Uses default credentials when not provided.
|The external ID used in the API call to assume the
|If set to
true when using an existing store, the attempt to create the store is skipped. Default is
|The number of write capacity units to configure on the table. Default is
|The number of read capacity units to configure on the table. Default is
|Optional. The duration of the TTL for the items in the table. The TTL is extended by this duration when an update is made to the item. Defaults to no TTL being used on the table.
The following example shows a
In-memory store (default)
The following example shows an
in_memory store, which is best used with a single-node cluster:
Source coordination metrics are interpreted differently depending on which source is configured. The format of a source coordination metric is
<sub-pipeline-name>_source_coordinator_<metric-name>. You can use the sub-pipeline name to identify the source for these metrics because each sub-pipeline is unique to each source.
The following are metrics related to partition progress:
partitionsCreatedCount: The number of partition items that have been created. For an S3 scan, this is the number of objects that have had partitions created for them.
partitionsCompleted: The number of partitions that have been fully processed and marked as
COMPLETED. For an S3 scan, this is the number of objects that have been processed.
noPartitionsAcquired: The number of times that a node has attempted to acquire a partition on which to perform work but has found no available partitions in the store. Use this to indicate that there is no more data coming into the source.
partitionsAcquired: The number of partitions that have been acquired by nodes on which to perform work. In non-error scenarios, this should be equal to the number of partitions created.
partitionsClosed: The number of partitions that have been marked as
CLOSED. This is only applicable to sources that use the CLOSED functionality.
The following are metrics related to partition errors:
partitionNotFoundErrors: Indicates that a partition item that is actively owned by a node does not have a corresponding store item. This should only occur if an item in the table has been manually deleted.
partitionNotOwnedErrors: Indicates that a node that owns a partition has lost ownership due to the partition ownership timeout expiring. Unless the source is able to checkpoint the partition with
saveState, this error results in duplicate item processing.
partitionUpdateErrors: The number of errors that were received when an update to the store for this partition item failed. Is prefixed with either
completeto indicate which update action is failing.