Link Search Menu Expand Document Documentation Menu

s3

The s3 sink saves and writes batches of Data Prepper events to Amazon Simple Storage Service (Amazon S3) objects. The configured codec determines how the s3 sink serializes the data into Amazon S3.

The s3 sink uses the following format when batching events:

${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}

When a batch of objects is written to Amazon S3, the objects are formatted similarly to the following:

my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json

For more information about how to configure an object, refer to Object key.

Usage

The following example creates a pipeline configured with an s3 sink. It contains additional options for customizing the event and size thresholds for the pipeline and sets the codec type as ndjson:

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        max_retries: 5
        bucket: bucket_name
        object_key:
          path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
        threshold:
          event_count: 10000
          maximum_size: 50mb
          event_collect_timeout: 15s
        codec:
          ndjson:
        buffer_type: in_memory

IAM permissions

To use the s3 sink, configure AWS Identity and Access Management (IAM) to grant Data Prepper permissions to write to Amazon S3. You can use a configuration similar to the following JSON configuration:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": [
              "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        }
    ]
}

Cross-account S3 access

When Data Prepper fetches data from an S3 bucket, it verifies bucket ownership using a bucket owner condition.

By default, the S3 sink does not require bucket_owners. If bucket_owners is configured and a bucket is not included in one of the mapped configurations, default_bucket_owner defaults to the account ID in aws.sts_role_arn. You can configure both bucket_owners and default_bucket_owner and apply the settings together.

When ingesting data from multiple S3 buckets with different account associations, configure Data Prepper for cross-account S3 access based on the following conditions:

  • For S3 buckets belonging to the same account, set default_bucket_owner to that account’s ID.
  • For S3 buckets belonging to multiple accounts, use a bucket_owners map.

A bucket_owners map specifies account IDs for buckets belonging to multiple accounts. For example, in the following configuration, my-bucket-01 is owned by 123456789012 and my-bucket-02 is owned by 999999999999:

sink:
  - s3:
      default_bucket_owner: 111111111111
      bucket_owners:
        my-bucket-01: 123456789012
        my-bucket-02: 999999999999

Configuration

Use the following options when customizing the s3 sink.

Option Required Type Description
bucket Yes String Specifies the sink’s S3 bucket name. Supports dynamic bucket naming using Data Prepper expressions, for example, test-${/bucket_id}. If a dynamic bucket is inaccessible and no default_bucket is configured, then the object data is dropped.
default_bucket No String A static bucket for inaccessible dynamic buckets in bucket.
bucket_owners No Map A map of bucket names and their account owner IDs for cross-account access. Refer to Cross-account S3 access.
default_bucket_owner No String The AWS account ID for an S3 bucket owner. Refer to Cross-account S3 access.
codec Yes Codec Serializes data in S3 objects.
aws Yes AWS The AWS configuration. Refer to aws.
threshold Yes Threshold Condition for writing objects to S3.
aggregate_threshold No Aggregate threshold A condition for flushing objects with a dynamic path_prefix.
object_key No Object key Sets path_prefix and file_pattern for object storage. The file pattern is events-%{yyyy-MM-dd'T'hh-mm-ss}. By default, these objects are found in the bucket’s root directory. path_prefix is configurable.
compression No String The compression algorithm: Either none, gzip, or snappy. Default is none.
buffer_type No Buffer type The buffer type configuration.
max_retries No Integer The maximum number of retries for S3 ingestion requests. Default is 5.

aws

Option Required Type Description
region No String The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region.
sts_role_arn No String The AWS Security Token Service (AWS STS) role to assume for requests to Amazon Simple Queue Service (Amazon SQS) and Amazon S3. Defaults to null, which uses the standard SDK behavior for credentials.
sts_header_overrides No Map A map of header overrides that the IAM role assumes for the sink plugin.
sts_external_id No String An AWS STS external ID used when Data Prepper assumes the role. For more information, refer to the ExternalId section under AssumeRole in the AWS STS API reference.

Threshold configuration

Use the following options to set ingestion thresholds for the s3 sink. Data Prepper writes events to an S3 object when any of these conditions occur.

Option Required Type Description
event_count Yes Integer The number of Data Prepper events to accumulate before writing an object to S3.
maximum_size No String The maximum number of bytes to accumulate before writing an object to S3. Default is 50mb.
event_collect_timeout Yes String The maximum amount of time before Data Prepper writes an event to S3. The value should be either an ISO-8601 duration, such as PT2M30S, or a simple notation, such as 60s or 1500ms.

Aggregate threshold configuration

Use the following options to set rules or limits that trigger certain actions or behavior when an aggregated value crosses a defined threshold.

Option Required Type Description
flush_capacity_ratio No Float The percentage of groups to be force-flushed when aggregate_threshold maximum_size is reached. The percentage is expressed as a number between 0.0 and 1.0. Default is 0.5.
maximum_size Yes String The maximum number of bytes to accumulate before force-flushing objects. For example, 128mb.

Buffer type

buffer_type is an optional configuration that determines how Data Prepper temporarily stores data before writing an object to S3. The default value is in_memory.

Use one of the following options:

  • in_memory: Stores the record in memory.
  • local_file: Flushes the record into a file on your local machine. This option uses your machine’s temporary directory.
  • multipart: Writes using the S3 multipart upload. Every 10 MB is written as a part.

Object key configuration

Use the following options to define how object keys are constructed for objects stored in S3.

Option Required Type Description
path_prefix No String The S3 key prefix path to use for objects written to S3. Accepts date-time formatting and dynamic injection of values using Data Prepper expressions. For example, you can use /${/my_partition_key}/%{yyyy}/%{MM}/%{dd}/%{HH}/ to create hourly folders in S3 based on the my_partition_key value. The prefix path should end with /. By default, Data Prepper writes objects to the S3 bucket root.

codec

The codec determines how the s3 source formats data written to each S3 object.

avro codec

The avro codec writes an event as an Apache Avro document. Because Avro requires a schema, you may either define the schema or have Data Prepper automatically generate it. Defining your own schema is recommended because this will allow it to be tailored to your particular use case.

When you provide your own Avro schema, that schema defines the final structure of your data. Any extra values in any incoming events that are not mapped in the Avro schema will not be included in the final destination. Data Prepper does not allow the use of include_keys or exclude_keys with a custom schema so as to avoid confusion between a custom Avro schema and the include_keys or exclude_keys sink configurations.

In cases where your data is uniform, you may be able to automatically generate a schema. Automatically generated schemas are based on the first event that the codec receives. The schema will only contain keys from this event, and all keys must be present in all events in order to automatically generate a working schema. Automatically generated schemas make all fields nullable. Use the include_keys and exclude_keys sink configurations to control which data is included in the automatically generated schema.

Avro fields should use a null union because this will allow missing values. Otherwise, all required fields must be present for each event. Use non-nullable fields only when you are certain they exist.

Use the following options to configure the codec.

Option Required Type Description
schema Yes String The Avro schema declaration. Not required if auto_schema is set to true.
auto_schema No Boolean When set to true, automatically generates the Avro schema declaration from the first event.

ndjson codec

The ndjson codec writes each line as a JSON object. The ndjson codec does not take any configurations.

json codec

The json codec writes events in a single large JSON file. Each event is written into an object within a JSON array.

Use the following options to configure the codec.

Option Required Type Description
key_name No String The name of the key for the JSON array. By default this is events.

parquet codec

The parquet codec writes events into a Parquet file. When using the codec, set buffer_type to in_memory.

The parquet codec writes data using the schema. Because Parquet requires an Avro schema, you may either define the schema yourself or have Data Prepper automatically generate it. Defining your own schema is recommended because this will allow it to be tailored to your particular use case.

For more information about the Avro schema, refer to Avro codec.

Use the following options to configure the codec.

Option Required Type Description
schema Yes String The Avro schema declaration. Not required if auto_schema is set to true.
auto_schema No Boolean When set to true, automatically generates the Avro schema declaration from the first event.

Setting a schema with Parquet

The following example pipeline shows how to configure the s3 sink to write Parquet data into a Parquet file using a schema for VPC Flow Logs:

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        bucket: mys3bucket
        object_key:
          path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
        codec:
          parquet:
            schema: >
              {
                "type" : "record",
                "namespace" : "org.opensearch.dataprepper.examples",
                "name" : "VpcFlowLog",
                "fields" : [
                  { "name" : "version", "type" : ["null", "string"]},
                  { "name" : "srcport", "type": ["null", "int"]},
                  { "name" : "dstport", "type": ["null", "int"]},
                  { "name" : "accountId", "type" : ["null", "string"]},
                  { "name" : "interfaceId", "type" : ["null", "string"]},
                  { "name" : "srcaddr", "type" : ["null", "string"]},
                  { "name" : "dstaddr", "type" : ["null", "string"]},
                  { "name" : "start", "type": ["null", "int"]},
                  { "name" : "end", "type": ["null", "int"]},
                  { "name" : "protocol", "type": ["null", "int"]},
                  { "name" : "packets", "type": ["null", "int"]},
                  { "name" : "bytes", "type": ["null", "int"]},
                  { "name" : "action", "type": ["null", "string"]},
                  { "name" : "logStatus", "type" : ["null", "string"]}
                ]
              }
        threshold:
          event_count: 500000000
          maximum_size: 20mb
          event_collect_timeout: PT15M
        buffer_type: in_memory