Link Search Menu Expand Document Documentation Menu

S3 logs

OpenSearch Data Prepper allows you to load logs from Amazon Simple Storage Service (Amazon S3), including traditional logs, JSON documents, and CSV logs.

Architecture

OpenSearch Data Prepper can read objects from S3 buckets using an Amazon Simple Queue Service (SQS) (Amazon SQS) queue and Amazon S3 Event Notifications.

OpenSearch Data Prepper polls the Amazon SQS queue for S3 event notifications. When OpenSearch Data Prepper receives a notification that an S3 object was created, OpenSearch Data Prepper reads and parses that S3 object.

The following diagram shows the overall architecture of the components involved.

S3 source architecture

The component data flow is as follows:

  1. A system produces logs into the S3 bucket.
  2. S3 creates an S3 event notification in the SQS queue.
  3. OpenSearch Data Prepper polls Amazon SQS for messages and then receives a message.
  4. OpenSearch Data Prepper downloads the content from the S3 object.
  5. OpenSearch Data Prepper sends a document to OpenSearch for the content in the S3 object.

Pipeline overview

OpenSearch Data Prepper supports reading data from S3 using the s3 source.

The following diagram shows a conceptual outline of an OpenSearch Data Prepper pipeline reading from S3.

S3 source architecture

Prerequisites

Before OpenSearch Data Prepper can read log data from S3, you need the following prerequisites:

  • An S3 bucket.
  • A log producer that writes logs to S3. The exact log producer will vary depending on your specific use case, but could include writing logs to S3 or a service such as Amazon CloudWatch.

Getting started

Use the following steps to begin loading logs from S3 with OpenSearch Data Prepper.

  1. Create an SQS standard queue for your S3 event notifications.
  2. Configure bucket notifications for SQS. Use the s3:ObjectCreated:* event type.
  3. Grant AWS IAM permissions to OpenSearch Data Prepper for accessing SQS and S3.
  4. (Recommended) Create an SQS dead-letter queue (DLQ).
  5. (Recommended) Configure an SQS re-drive policy to move failed messages into the DLQ.

Setting permissions for OpenSearch Data Prepper

To view S3 logs, OpenSearch Data Prepper needs access to Amazon SQS and S3. Use the following example to set up permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        },
        {
            "Sid": "sqs-access",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage"
            ],
            "Resource": "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
        },
        {
            "Sid": "kms-access",
            "Effect": "Allow",
            "Action": "kms:Decrypt",
            "Resource": "arn:aws:kms:<YOUR-REGION>:<123456789012>:key/<YOUR-KMS-KEY>"
        }
    ]
}

If your S3 objects or SQS queues do not use KMS, you can remove the kms:Decrypt permission.

SQS dead-letter queue

The following two options can be used to handle S3 object processing errors:

  • Use an SQS dead-letter queue (DLQ) to track the failure. This is the recommended approach.
  • Delete the message from SQS. You must manually find the S3 object and correct the error.

The following diagram shows the system architecture when using SQS with DLQ.

S3 source architecture with dlq

To use an SQS dead-letter queue, perform the following steps:

  1. Create a new SQS standard queue to act as the DLQ.
  2. Configure your SQS re-drive policy to use DLQ. Consider using a low value such as 2 or 3 for the Maximum Receives setting.
  3. Configure the OpenSearch Data Prepper s3 source to use retain_messages for on_error. This is the default behavior.

Pipeline design

Create a pipeline to read logs from S3, starting with an s3 source plugin. Use the following example for guidance.

s3-log-pipeline:
   source:
     s3:
       notification_type: sqs
       compression: gzip
       codec:
         newline:
       sqs:
         # Change this value to your SQS Queue URL
         queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
         visibility_timeout: "2m"

Configure the following options according to your use case:

  • queue_url: This the SQS queue URL and is always unique to your pipeline.
  • codec: The codec determines how to parse the incoming data.
  • visibility_timeout: Configure this value to be large enough for OpenSearch Data Prepper to process 10 S3 objects. However, if you make this value too large, messages that fail to process will take at least as long as the specified value before OpenSearch Data Prepper retries.

The default values for each option work for the majority of use cases. For all available options for the S3 source, see s3.

s3-log-pipeline:
   source:
     s3:
       notification_type: sqs
       compression: gzip
       codec:
         newline:
       sqs:
         # Change this value to your SQS Queue URL
         queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
         visibility_timeout: "2m"
       aws:
         # Specify the correct region
         region: "<YOUR-REGION>"
         # This shows using an STS role, but you can also use your system's default permissions.
         sts_role_arn: "arn:aws:iam::<123456789012>:role/<DATA-PREPPER-ROLE>"
   processor:
     # You can configure a grok pattern to enrich your documents in OpenSearch.
     #- grok:
     #    match:
     #      message: [ "%{COMMONAPACHELOG}" ]
   sink:
     - opensearch:
         hosts: [ "https://localhost:9200" ]
         # Change to your credentials
         username: "admin"
         password: "admin"
         index: s3_logs

Multiple OpenSearch Data Prepper pipelines

It is recommended that you have one SQS queue per OpenSearch Data Prepper pipeline. In addition, you can have multiple nodes in the same cluster reading from the same SQS queue, which doesn’t require additional OpenSearch Data Prepper configuration.

If you have multiple pipelines, you must create multiple SQS queues for each pipeline, even if both pipelines use the same S3 bucket.

Amazon SNS fanout pattern

To meet the scale of logs produced by S3, some users require multiple SQS queues for their logs. You can use Amazon Simple Notification Service (Amazon SNS) to route event notifications from S3 to an SQS fanout pattern. Using SNS, all S3 event notifications are sent directly to a single SNS topic, where you can subscribe to multiple SQS queues.

To make sure that OpenSearch Data Prepper can directly parse the event from the SNS topic, configure raw message delivery on the SNS-to-SQS subscription. Applying this option does not affect other SQS queues subscribed to the SNS topic.

Filtering and retrieving data using Amazon S3 Select

If a pipeline uses an S3 source, you can use SQL expressions to perform filtering and computations on the contents of S3 objects before ingesting them into the pipeline.

The s3_select option supports objects in the Parquet File Format. It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and supports columnar compression for the Parquet File Format using GZIP and Snappy.

Refer to Filtering and retrieving data using Amazon S3 Select and SQL reference for Amazon S3 Select for comprehensive information about using Amazon S3 Select.

The following example pipeline retrieves all data from S3 objects encoded in the Parquet File Format:

pipeline:
  source:
    s3:
      s3_select:
        expression: "select * from s3object s"  
        input_serialization: parquet
      notification_type: "sqs"
...

The following example pipeline retrieves only the first 10,000 records in the objects:

pipeline:
  source:
    s3:
      s3_select:
        expression: "select * from s3object s LIMIT 10000"
        input_serialization: parquet
      notification_type: "sqs"
...

The following example pipeline retrieves records from S3 objects that have a data_value in the given range of 200–500:

pipeline:
  source:
    s3:
      s3_select:
        expression: "select s.* from s3object s where s.data_value > 200 and s.data_value < 500 "
        input_serialization: parquet
      notification_type: "sqs"
...