s3
The s3
sink saves and writes batches of Data Prepper events to Amazon Simple Storage Service (Amazon S3) objects. The configured codec
determines how the s3
sink serializes the data into Amazon S3.
The s3
sink uses the following format when batching events:
${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}
When a batch of objects is written to S3, the objects are formatted similarly to the following:
my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json
For more information about how to configure an object, see the Object key section.
Usage
The following example creates a pipeline configured with an s3 sink. It contains additional options for customizing the event and size thresholds for which the pipeline sends record events and sets the codec type ndjson
:
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
max_retries: 5
bucket: bucket_name
object_key:
path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 10000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
ndjson:
buffer_type: in_memory
IAM permissions
In order to use the s3
sink, configure AWS Identity and Access Management (IAM) to grant Data Prepper permissions to write to Amazon S3. You can use a configuration similar to the following JSON configuration:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3-access",
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
}
]
}
Configuration
Use the following options when customizing the s3
sink.
Option | Required | Type | Description |
---|---|---|---|
bucket | Yes | String | The name of the S3 bucket to which objects are stored. The name must match the name of your object store. |
codec | Yes | Codec | The codec determining the format of output data. |
aws | Yes | AWS | The AWS configuration. See aws for more information. |
threshold | Yes | Threshold | Configures when to write an object to S3. |
object_key | No | Sets the path_prefix and the file_pattern of the object store. The file pattern is always events-%{yyyy-MM-dd'T'hh-mm-ss} . By default, those objects are found inside the root directory of the bucket. The path_prefix is configurable. | |
compression | No | String | The compression algorithm to apply: none , gzip , or snappy . Default is none . |
buffer_type | No | Buffer type | Determines the buffer type. |
max_retries | No | Integer | The maximum number of times a single request should retry when ingesting data to S3. Defaults to 5 . |
aws
Option | Required | Type | Description |
---|---|---|---|
region | No | String | The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region. |
sts_role_arn | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null , which will use the standard SDK behavior for credentials. |
sts_header_overrides | No | Map | A map of header overrides that the IAM role assumes for the sink plugin. |
sts_external_id | No | String | An STS external ID used when Data Prepper assumes the role. For more information, see the ExternalId documentation in the STS AssumeRole API reference. |
Threshold configuration
Use the following options to set ingestion thresholds for the s3
sink. When any of these conditions are met, Data Prepper will write events to an S3 object.
Option | Required | Type | Description |
---|---|---|---|
event_count | Yes | Integer | The number of Data Prepper events to accumulate before writing an object to S3. |
maximum_size | No | String | The maximum number of bytes to accumulate before writing an object to S3. Default is 50mb . |
event_collect_timeout | Yes | String | The maximum amount of time before Data Prepper writes an event to S3. The value should be either an ISO-8601 duration, such as PT2M30S , or a simple notation, such as 60s or 1500ms . |
Buffer type
buffer_type
is an optional configuration that determines how Data Prepper temporarily stores data before writing an object to S3. The default value is in_memory
. Use one of the following options:
in_memory
: Stores the record in memory.local_file
: Flushes the record into a file on your local machine. This uses your machine’s temporary directory.multipart
: Writes using the S3 multipart upload. Every 10 MB is written as a part.
Object key configuration
Option | Required | Type | Description |
---|---|---|---|
path_prefix | No | String | The S3 key prefix path to use for objects written to S3. Accepts date-time formatting. For example, you can use %{yyyy}/%{MM}/%{dd}/%{HH}/ to create hourly folders in S3. The prefix path should end with / . By default, Data Prepper writes objects to the root of the S3 bucket. |
codec
The codec
determines how the s3
source formats data written to each S3 object.
avro codec
The avro
codec writes an event as an Apache Avro document.
Because Avro requires a schema, you may either define the schema yourself, or Data Prepper will automatically generate a schema. In general, you should define your own schema because it will most accurately reflect your needs.
We recommend that you make your Avro fields use a null union. Without the null union, each field must be present or the data will fail to write to the sink. If you can be certain that each each event has a given field, you can make it non-nullable.
When you provide your own Avro schema, that schema defines the final structure of your data. Therefore, any extra values inside any incoming events that are not mapped in the Arvo schema will not be included in the final destination. To avoid confusion between a custom Arvo schema and the include_keys
or exclude_keys
sink configurations, Data Prepper does not allow the use of the include_keys
or exclude_keys
with a custom schema.
In cases where your data is uniform, you may be able to automatically generate a schema. Automatically generated schemas are based on the first event received by the codec. The schema will only contain keys from this event. Therefore, you must have all keys present in all events in order for the automatically generated schema to produce a working schema. Automatically generated schemas make all fields nullable. Use the sink’s include_keys
and exclude_keys
configurations to control what data is included in the auto-generated schema.
Option | Required | Type | Description |
---|---|---|---|
schema | Yes | String | The Avro schema declaration. Not required if auto_schema is set to true. |
auto_schema | No | Boolean | When set to true , automatically generates the Avro schema declaration from the first event. |
ndjson codec
The ndjson
codec writes each line as a JSON object.
The ndjson
codec does not take any configurations.
json codec
The json
codec writes events in a single large JSON file. Each event is written into an object within a JSON array.
Option | Required | Type | Description |
---|---|---|---|
key_name | No | String | The name of the key for the JSON array. By default this is events . |
parquet codec
The parquet
codec writes events into a Parquet file. When using the Parquet codec, set the buffer_type
to in_memory
.
The Parquet codec writes data using the Avro schema. Because Parquet requires an Avro schema, you may either define the schema yourself, or Data Prepper will automatically generate a schema. However, we generally recommend that you define your own schema so that it can best meet your needs.
For details on the Avro schema and recommendations, see the Avro codec documentation.
Option | Required | Type | Description |
---|---|---|---|
schema | Yes | String | The Avro schema declaration. Not required if auto_schema is set to true. |
auto_schema | No | Boolean | When set to true , automatically generates the Avro schema declaration from the first event. |
Setting a schema with Parquet
The following example shows you how to configure the s3
sink to write Parquet data into a Parquet file using a schema for VPC Flow Logs:
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
bucket: mys3bucket
object_key:
path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
codec:
parquet:
schema: >
{
"type" : "record",
"namespace" : "org.opensearch.dataprepper.examples",
"name" : "VpcFlowLog",
"fields" : [
{ "name" : "version", "type" : ["null", "string"]},
{ "name" : "srcport", "type": ["null", "int"]},
{ "name" : "dstport", "type": ["null", "int"]},
{ "name" : "accountId", "type" : ["null", "string"]},
{ "name" : "interfaceId", "type" : ["null", "string"]},
{ "name" : "srcaddr", "type" : ["null", "string"]},
{ "name" : "dstaddr", "type" : ["null", "string"]},
{ "name" : "start", "type": ["null", "int"]},
{ "name" : "end", "type": ["null", "int"]},
{ "name" : "protocol", "type": ["null", "int"]},
{ "name" : "packets", "type": ["null", "int"]},
{ "name" : "bytes", "type": ["null", "int"]},
{ "name" : "action", "type": ["null", "string"]},
{ "name" : "logStatus", "type" : ["null", "string"]}
]
}
threshold:
event_count: 500000000
maximum_size: 20mb
event_collect_timeout: PT15M
buffer_type: in_memory