s3
The s3
sink saves and writes batches of Data Prepper events to Amazon Simple Storage Service (Amazon S3) objects. The configured codec
determines how the s3
sink serializes the data into Amazon S3.
The s3
sink uses the following format when batching events:
${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}
When a batch of objects is written to Amazon S3, the objects are formatted similarly to the following:
my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json
For more information about how to configure an object, refer to Object key.
Usage
The following example creates a pipeline configured with an s3
sink. It contains additional options for customizing the event and size thresholds for the pipeline and sets the codec type as ndjson
:
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
max_retries: 5
bucket: bucket_name
object_key:
path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 10000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
ndjson:
buffer_type: in_memory
IAM permissions
To use the s3
sink, configure AWS Identity and Access Management (IAM) to grant Data Prepper permissions to write to Amazon S3. You can use a configuration similar to the following JSON configuration:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3-access",
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
}
]
}
Cross-account S3 access
When Data Prepper fetches data from an S3 bucket, it verifies bucket ownership using a bucket owner condition.
By default, the S3 sink does not require bucket_owners
. If bucket_owners
is configured and a bucket is not included in one of the mapped configurations, default_bucket_owner
defaults to the account ID in aws.sts_role_arn
. You can configure both bucket_owners
and default_bucket_owner
and apply the settings together.
When ingesting data from multiple S3 buckets with different account associations, configure Data Prepper for cross-account S3 access based on the following conditions:
- For S3 buckets belonging to the same account, set
default_bucket_owner
to that account’s ID. - For S3 buckets belonging to multiple accounts, use a
bucket_owners
map.
A bucket_owners
map specifies account IDs for buckets belonging to multiple accounts. For example, in the following configuration, my-bucket-01
is owned by 123456789012
and my-bucket-02
is owned by 999999999999
:
sink:
- s3:
default_bucket_owner: 111111111111
bucket_owners:
my-bucket-01: 123456789012
my-bucket-02: 999999999999
Configuration
Use the following options when customizing the s3
sink.
Option | Required | Type | Description |
---|---|---|---|
bucket | Yes | String | Specifies the sink’s S3 bucket name. Supports dynamic bucket naming using Data Prepper expressions, for example, test-${/bucket_id} . If a dynamic bucket is inaccessible and no default_bucket is configured, then the object data is dropped. |
default_bucket | No | String | A static bucket for inaccessible dynamic buckets in bucket . |
bucket_owners | No | Map | A map of bucket names and their account owner IDs for cross-account access. Refer to Cross-account S3 access. |
default_bucket_owner | No | String | The AWS account ID for an S3 bucket owner. Refer to Cross-account S3 access. |
codec | Yes | Codec | Serializes data in S3 objects. |
aws | Yes | AWS | The AWS configuration. Refer to aws. |
threshold | Yes | Threshold | Condition for writing objects to S3. |
aggregate_threshold | No | Aggregate threshold | A condition for flushing objects with a dynamic path_prefix . |
object_key | No | Object key | Sets path_prefix and file_pattern for object storage. The file pattern is events-%{yyyy-MM-dd'T'hh-mm-ss} . By default, these objects are found in the bucket’s root directory. path_prefix is configurable. |
compression | No | String | The compression algorithm: Either none , gzip , or snappy . Default is none . |
buffer_type | No | Buffer type | The buffer type configuration. |
max_retries | No | Integer | The maximum number of retries for S3 ingestion requests. Default is 5 . |
aws
Option | Required | Type | Description |
---|---|---|---|
region | No | String | The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region. |
sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon Simple Queue Service (Amazon SQS) and Amazon S3. Defaults to null , which uses the standard SDK behavior for credentials. |
sts_header_overrides | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. |
sts_external_id | No | String | An AWS STS external ID used when Data Prepper assumes the role. For more information, refer to the ExternalId section under AssumeRole in the AWS STS API reference. |
Threshold configuration
Use the following options to set ingestion thresholds for the s3
sink. Data Prepper writes events to an S3 object when any of these conditions occur.
Option | Required | Type | Description |
---|---|---|---|
event_count | Yes | Integer | The number of Data Prepper events to accumulate before writing an object to S3. |
maximum_size | No | String | The maximum number of bytes to accumulate before writing an object to S3. Default is 50mb . |
event_collect_timeout | Yes | String | The maximum amount of time before Data Prepper writes an event to S3. The value should be either an ISO-8601 duration, such as PT2M30S , or a simple notation, such as 60s or 1500ms . |
Aggregate threshold configuration
Use the following options to set rules or limits that trigger certain actions or behavior when an aggregated value crosses a defined threshold.
Option | Required | Type | Description |
---|---|---|---|
flush_capacity_ratio | No | Float | The percentage of groups to be force-flushed when aggregate_threshold maximum_size is reached. The percentage is expressed as a number between 0.0 and 1.0 . Default is 0.5 . |
maximum_size | Yes | String | The maximum number of bytes to accumulate before force-flushing objects. For example, 128mb . |
Buffer type
buffer_type
is an optional configuration that determines how Data Prepper temporarily stores data before writing an object to S3. The default value is in_memory
.
Use one of the following options:
in_memory
: Stores the record in memory.local_file
: Flushes the record into a file on your local machine. This option uses your machine’s temporary directory.multipart
: Writes using the S3 multipart upload. Every 10 MB is written as a part.
Object key configuration
Use the following options to define how object keys are constructed for objects stored in S3.
Option | Required | Type | Description |
---|---|---|---|
path_prefix | No | String | The S3 key prefix path to use for objects written to S3. Accepts date-time formatting and dynamic injection of values using Data Prepper expressions. For example, you can use /${/my_partition_key}/%{yyyy}/%{MM}/%{dd}/%{HH}/ to create hourly folders in S3 based on the my_partition_key value. The prefix path should end with / . By default, Data Prepper writes objects to the S3 bucket root. |
codec
The codec
determines how the s3
source formats data written to each S3 object.
avro
codec
The avro
codec writes an event as an Apache Avro document. Because Avro requires a schema, you may either define the schema or have Data Prepper automatically generate it. Defining your own schema is recommended because this will allow it to be tailored to your particular use case.
When you provide your own Avro schema, that schema defines the final structure of your data. Any extra values in any incoming events that are not mapped in the Avro schema will not be included in the final destination. Data Prepper does not allow the use of include_keys
or exclude_keys
with a custom schema so as to avoid confusion between a custom Avro schema and the include_keys
or exclude_keys
sink configurations.
In cases where your data is uniform, you may be able to automatically generate a schema. Automatically generated schemas are based on the first event that the codec receives. The schema will only contain keys from this event, and all keys must be present in all events in order to automatically generate a working schema. Automatically generated schemas make all fields nullable. Use the include_keys
and exclude_keys
sink configurations to control which data is included in the automatically generated schema.
Avro fields should use a null union because this will allow missing values. Otherwise, all required fields must be present for each event. Use non-nullable fields only when you are certain they exist.
Use the following options to configure the codec.
Option | Required | Type | Description |
---|---|---|---|
schema | Yes | String | The Avro schema declaration. Not required if auto_schema is set to true. |
auto_schema | No | Boolean | When set to true , automatically generates the Avro schema declaration from the first event. |
ndjson
codec
The ndjson
codec writes each line as a JSON object. The ndjson
codec does not take any configurations.
json
codec
The json
codec writes events in a single large JSON file. Each event is written into an object within a JSON array.
Use the following options to configure the codec.
Option | Required | Type | Description |
---|---|---|---|
key_name | No | String | The name of the key for the JSON array. By default this is events . |
parquet
codec
The parquet
codec writes events into a Parquet file. When using the codec, set buffer_type
to in_memory
.
The parquet
codec writes data using the schema. Because Parquet requires an Avro schema, you may either define the schema yourself or have Data Prepper automatically generate it. Defining your own schema is recommended because this will allow it to be tailored to your particular use case.
For more information about the Avro schema, refer to Avro codec.
Use the following options to configure the codec.
Option | Required | Type | Description |
---|---|---|---|
schema | Yes | String | The Avro schema declaration. Not required if auto_schema is set to true. |
auto_schema | No | Boolean | When set to true , automatically generates the Avro schema declaration from the first event. |
Setting a schema with Parquet
The following example pipeline shows how to configure the s3
sink to write Parquet data into a Parquet file using a schema for VPC Flow Logs:
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
bucket: mys3bucket
object_key:
path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
codec:
parquet:
schema: >
{
"type" : "record",
"namespace" : "org.opensearch.dataprepper.examples",
"name" : "VpcFlowLog",
"fields" : [
{ "name" : "version", "type" : ["null", "string"]},
{ "name" : "srcport", "type": ["null", "int"]},
{ "name" : "dstport", "type": ["null", "int"]},
{ "name" : "accountId", "type" : ["null", "string"]},
{ "name" : "interfaceId", "type" : ["null", "string"]},
{ "name" : "srcaddr", "type" : ["null", "string"]},
{ "name" : "dstaddr", "type" : ["null", "string"]},
{ "name" : "start", "type": ["null", "int"]},
{ "name" : "end", "type": ["null", "int"]},
{ "name" : "protocol", "type": ["null", "int"]},
{ "name" : "packets", "type": ["null", "int"]},
{ "name" : "bytes", "type": ["null", "int"]},
{ "name" : "action", "type": ["null", "string"]},
{ "name" : "logStatus", "type" : ["null", "string"]}
]
}
threshold:
event_count: 500000000
maximum_size: 20mb
event_collect_timeout: PT15M
buffer_type: in_memory