Source coordination
Source coordination is the concept of coordinating and distributing work between Data Prepper data sources in a multi-node environment. Some data sources, such as Amazon Kinesis or Amazon Simple Queue Service (Amazon SQS), handle coordination natively. Other data sources, such as OpenSearch, Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and JDBC/ODBC, do not support source coordination.
Data Prepper source coordination decides which partition of work is performed by each node in the Data Prepper cluster and prevents duplicate partitions of work.
Inspired by the Kinesis Client Library, Data Prepper utilizes a distributed store in the form of a lease to handle the distribution and deduplication of work.
Formatting partitions
Source coordination separates sources into “partitions of work.” For example, an S3 object would be a partition of work for Amazon S3, or an OpenSearch index would be a partition of work for OpenSearch.
Data Prepper takes each partition of work that is chosen by the source and creates corresponding items in the distributed store that Data Prepper uses for source coordination. Each of these items has the following standard format, which can be extended by the distributed store implementation.
Value | Type | Description |
---|---|---|
sourceIdentifier | String | The identifier for which the Data Prepper pipeline works on this partition. By default, the sourceIdentifier is prefixed by the sub-pipeline name, but an additional prefix can be configured with partition_prefix in your data-prepper-config.yaml file. |
sourcePartitionKey | String | The identifier for the partition of work associated with this item. For example, for an s3 source with scan capabilities, this identifier is the S3 bucket’s objectKey combination. |
partitionOwner | String | An identifier for the node that actively owns and is working on this partition. This ID contains the hostname of the node but is null when this partition is not owned. |
partitionProgressState | String | A JSON string object representing the progress made on a partition of work or any additional metadata that may be needed by the source in the case of another node resuming where the last node stopped during a crash. |
partitionOwnershipTimeout | Timestamp | Whenever a Data Prepper node acquires a partition, a 10-minute timeout is given to the owner of the partition to handle the event of a node crashing. The ownership is renewed with another 10 minutes when the owner saves the state of the partition. |
sourcePartitionStatus | Enum | Represents the current state of the partition: ASSIGNED means the partition is currently being processed, UNASSIGNED means the partition is waiting to be processed, CLOSED means the partition is waiting to be processed at a later date, and COMPLETED means the partition has already been processed. |
reOpenAt | Timestamp | Represents the time at which CLOSED partitions reopen and are considered to be available for processing. Only applies to CLOSED partitions. |
closedCount | Long | Tracks how many times the partition has been marked as CLOSED . |
Acquiring partitions
Partitions are acquired in the order that they are returned in the List<PartitionIdentifer>
provided by the source. When a node attempts to acquire a partition, Data Prepper performs the following steps:
- Data Prepper queries the
ASSIGNED
partitions to check whether anyASSIGNED
partitions have expired partition owners. This is intended to assign priority to partitions that have had nodes crash in the middle of processing, which can allow for using a partition state that may be time sensitive. - After querying
ASSIGNED
partitions, Data Prepper queries theCLOSED
partitions to determine whether any of the partition’sreOpenAt
timestamps have been reached. - If there are no
ASSIGNED
orCLOSED
partitions available, then Data Prepper queries theUNASSIGNED
partitions until on of these partitions isASSIGNED
.
If this flow occurs and no partition is acquired by the node, then the partition supplier function provided in the getNextPartition
method of SourceCoordinator
will create new partitions. After the supplier function completes, Data Prepper again queries the partitions for ASSIGNED
, CLOSED
, and UNASSIGNED
.
Global state
Any function that is passed to the getNextPartition
method creates new partitions with a global state of Map<String, Object>
. This state is shared between all of the nodes in the cluster and will only be run by a single node at a time, as determined by the source.
Configuration
The following table provide optional configuration values for source_coordination
.
Value | Type | Description |
---|---|---|
partition_prefix | String | A prefix to the sourceIdentifier used to differentiate between Data Prepper clusters that share the same distributed store. |
store | Object | The object that comprises the configuration for the store to be used, where the key is the name of the store, such as in_memory or dynamodb , and the value is any configuration available on that store type. |
Supported stores
As of Data Prepper 2.4, only in_memory
and dynamodb
stores are supported:
- The
in_memory
store is the default when nosource_coordination
settings are configured in thedata-prepper-config.yaml
file and should only be used for single-node configurations. - The
dynamodb
store is used for multi-node Data Prepper environments. Thedynamodb
store can be shared between one or more Data Prepper clusters that need to utilize source coordination.
DynamoDB store
Data Prepper will attempt to create the dynamodb
table on startup unless the skip_table_creation
flag is configured to true
. Optionally, you can configure the time-to-live (ttl
) on the table, which results in the store cleaning up items over time. Some sources rely on source coordination for the deduplication of data, so be sure to configure a large enough ttl
for the pipeline duration.
If ttl
is not configured on the table, any items no longer needed in the table must be cleaned manually.
The following shows the full set of permissions needed for Data Prepper to create the table, enable ttl
, and interact with the table:
{
"Sid": "ReadWriteSourceCoordinationDynamoStore",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTimeToLive",
"dynamodb:UpdateTimeToLive",
"dynamodb:DescribeTable",
"dynamodb:CreateTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}",
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}/index/source-status"
]
}
Value | Required | Type | Description |
---|---|---|---|
table_name | Yes | String | The name of the table to be used for source coordination. |
region | Yes | String | The region of the DynamoDB table. |
sts_role_arn | No | String | The sts role that contains the table permissions. Uses default credentials when not provided. |
sts_external_id | No | String | The external ID used in the API call to assume the sts_role_arn . |
skip_table_creation | No | Boolean | If set to true when using an existing store, the attempt to create the store is skipped. Default is false . |
provisioned_write_capacity_units | No | Integer | The number of write capacity units to configure on the table. Default is 10 . |
provisioned_read_capacity_units | No | Integer | The number of read capacity units to configure on the table. Default is 10 . |
ttl | Duration | Optional. The duration of the TTL for the items in the table. The TTL is extended by this duration when an update is made to the item. Defaults to no TTL being used on the table. |
The following example shows a dynamodb
store:
source_coordination:
store:
dynamodb:
table_name: "DataPrepperSourceCoordinationStore"
region: "us-east-1"
sts_role_arn: "arn:aws:iam::##########:role/SourceCoordinationTableRole"
ttl: "P7D"
skip_table_creation: true
In-memory store (default)
The following example shows an in_memory
store, which is best used with a single-node cluster:
source_coordination:
store:
in_memory:
Metrics
Source coordination metrics are interpreted differently depending on which source is configured. The format of a source coordination metric is <sub-pipeline-name>_source_coordinator_<metric-name>
. You can use the sub-pipeline name to identify the source for these metrics because each sub-pipeline is unique to each source.
Progress metrics
The following are metrics related to partition progress:
partitionsCreatedCount
: The number of partition items that have been created. For an S3 scan, this is the number of objects that have had partitions created for them.partitionsCompleted
: The number of partitions that have been fully processed and marked asCOMPLETED
. For an S3 scan, this is the number of objects that have been processed.noPartitionsAcquired
: The number of times that a node has attempted to acquire a partition on which to perform work but has found no available partitions in the store. Use this to indicate that there is no more data coming into the source.partitionsAcquired
: The number of partitions that have been acquired by nodes on which to perform work. In non-error scenarios, this should be equal to the number of partitions created.partitionsClosed
: The number of partitions that have been marked asCLOSED
. This is only applicable to sources that use the CLOSED functionality.
The following are metrics related to partition errors:
partitionNotFoundErrors
: Indicates that a partition item that is actively owned by a node does not have a corresponding store item. This should only occur if an item in the table has been manually deleted.partitionNotOwnedErrors
: Indicates that a node that owns a partition has lost ownership due to the partition ownership timeout expiring. Unless the source is able to checkpoint the partition withsaveState
, this error results in duplicate item processing.partitionUpdateErrors
: The number of errors that were received when an update to the store for this partition item failed. Is prefixed with eithersaveState
,close
, orcomplete
to indicate which update action is failing.