kinesis
You can use the OpenSearch Data Prepper kinesis
source to ingest records from one or more Amazon Kinesis Data Streams.
Usage
The following example pipeline specifies Kinesis as a source. The pipeline ingests data from multiple Kinesis data streams named stream1
and stream2
and sets the initial_position
to indicate the starting point for reading the stream records:
version: "2"
kinesis-pipeline:
source:
kinesis:
streams:
- stream_name: "stream1"
initial_position: "LATEST"
- stream_name: "stream2"
initial_position: "LATEST"
aws:
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"
Configuration options
The kinesis
source supports the following configuration options.
Option | Required | Type | Description |
---|---|---|---|
aws | Yes | AWS | Specifies the AWS configuration. See aws . |
acknowledgments | No | Boolean | When set to true , enables the kinesis source to receive end-to-end acknowledgments when events are received by OpenSearch sinks. |
streams | Yes | List | Configures a list of multiple Kinesis data streams that the kinesis source uses to read records. You can configure up to four streams. See Streams. |
codec | Yes | Codec | Specifies the codec to apply. |
buffer_timeout | No | Duration | Sets the amount of time allowed for writing events to the OpenSearch Data Prepper buffer before timeout occurs. Any events that the source cannot write to the buffer during the specified amount of time are discarded. Default is 1s . |
records_to_accumulate | No | Integer | Determines the number of messages that accumulate before being written to the buffer. Default is 100 . |
consumer_strategy | No | String | Selects the consumer strategy to use for ingesting Kinesis data streams. The default is fan-out , but polling can also be used. If polling is enabled, the additional configuration is required. |
polling | No | polling | See polling. |
Streams
You can use the following options in the streams
array.
Option | Required | Type | Description |
---|---|---|---|
stream_name | Yes | String | Defines the name of each Kinesis data stream. |
initial_position | No | String | Sets the initial_position to determine at what point the kinesis source starts reading stream records. Use LATEST to start from the most recent record or EARLIEST to start from the beginning of the stream. Default is LATEST . |
checkpoint_interval | No | Duration | Configure the checkpoint_interval to periodically checkpoint Kinesis data streams and avoid duplication of record processing. Default is PT2M . |
compression | No | String | Specifies the compression format. To decompress records added by a CloudWatch Logs Subscription Filter to Kinesis, use the gzip compression format. |
codec
The codec
determines how the kinesis
source parses each Kinesis stream record. For increased and more efficient performance, you can use codec combinations with certain processors.
json codec
The json
codec parses each single line as a single JSON object from a JSON array and then creates an OpenSearch Data Prepper event for each object in the array. It can be used for parsing nested CloudWatch events into individual log entries. It also supports the below configuration to use with this codec.
Option | Required | Type | Description |
---|---|---|---|
key_name | No | String | The name of the input field from which to extract the JSON array and create OpenSearch Data Prepper events. |
include_keys | No | List | The list of input fields to be extracted and added as additional fields in the OpenSearch Data Prepper event. |
include_keys_metadata | No | List | The list of input fields to be extracted and added to the OpenSearch Data Prepper event metadata object. |
newline
codec
The newline
codec parses each Kinesis stream record as a single log event, making it ideal for processing single-line records. It also works well with the parse_json
processor to parse each line.
You can use the following options to configure the newline
codec.
Option | Required | Type | Description |
---|---|---|---|
skip_lines | No | Integer | Sets the number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is 0 . |
header_destination | No | String | Defines a key value to assign to the header line of the stream event. If this option is specified, then each event will contain a header_destination field. |
polling
When the consumer_strategy
is set to polling
, the kinesis
source uses a polling-based approach to read records from the Kinesis data streams, instead of the default fan-out
approach.
Option | Required | Type | Description |
---|---|---|---|
max_polling_records | No | Integer | Sets the number of records to fetch from Kinesis during a single call. |
idle_time_between_reads | No | Duration | Defines the amount of idle time between calls. |
aws
You can use the following options in the aws
configuration.
Option | Required | Type | Description |
---|---|---|---|
region | No | String | Sets the AWS Region to use for credentials. Defaults to the standard SDK behavior for determining the Region. |
sts_role_arn | No | String | Defines the AWS Security Token Service (AWS STS) role to assume for requests to Amazon Kinesis Data Streams and Amazon DynamoDB. Defaults to null , which uses the standard SDK behavior for credentials. |
aws_sts_header_overrides | No | Map | Defines a map of header overrides that the AWS Identity and Access Management (IAM) role assumes for the sink plugin. |
Exposed metadata attributes
The kinesis
source adds the following metadata to each processed event. You can access the metadata attributes using the expression syntax getMetadata
function.
stream_name
: Contains the name of the Kinesis data stream from which the event was obtained.
Permissions
The following minimum permissions are required in order to run kinesis
as a source:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamConsumer",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:SubscribeToShard"
],
"Resource": [
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream1",
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream2"
]
},
{
"Sid": "allowCreateTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:UpdateItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/kinesis-pipeline"
]
}
]
}
The kinesis
source uses a DynamoDB table for ingestion coordination among multiple workers, so you need DynamoDB permissions.
Metrics
The kinesis
source includes the following metrics.
Counters
recordsProcessed
: Counts the number of processed stream records.recordProcessingErrors
: Counts the number of stream record processing errors.acknowledgementSetSuccesses
: Counts the number of processed stream records that were successfully added to the sink.acknowledgementSetFailures
: Counts the number of processed stream records that failed to be added to the sink.