Announcing Data Prepper 2.4.0

Mon, Aug 28, 2023 · Sudipto Guha, Krishna Kondaka, Asif Sohail Mohammed,

Data Prepper 2.4.0 is now available for download. This release introduces a number of exciting new features, including a new Apache Kafka source, Amazon S3 batch processing, filtering inside of sinks, new S3 sink codecs, and streaming anomaly detection with high cardinality.

Kafka source

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Deploying Data Prepper pipelines prior to data storage can lead to substantial performance improvements and reduce the need for large amounts of storage. Therefore, Data Prepper 2.4 adds support for Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) as a source, allowing your ingestion pipeline to consume data from one or more topics in a Kafka cluster. The pipeline with the Kafka source then transforms the data and sends it to your storage solution of choice, including OpenSearch or Amazon Simple Storage Service (Amazon S3).

Furthermore, multiple Data Prepper pipelines can read data from the same Kafka topics, providing you the capabilities to configure Kafka parameters, such as the number of consumers per topic, or tune different fetch parameters for high- and low-priority data.

To get started using Kafka as a source, add the following basic configuration to your pipeline:

    - name: topic1
    group_id: "group_id1"
    - name: topic2
    group_id: "group_id1"

For more information about using Kafka and Amazon MSK in Data Prepper, including guidance on schema registries and data durability, see the kafka source documentation.

S3 batch processing

Data Prepper 2.4.0 adds support for S3 scan functionality, which scans Amazon S3 buckets to process existing objects without having to set up Amazon S3 Event Notifications. This is ideal for use cases where large amounts of historical data need to be migrated or for users who want to run night scan jobs on data uploaded to S3 buckets.

Use the following source configuration to get started with S3 scan:

    acknowledgments: true
      start_time: 2023-01-01T00:00:00
      end_time: 2023-12-31T23:59:59
      - bucket:
          # start_time: 2023-01-01T00:00:00
          # end_time: 2023-12-31T23:59:59
          name: "s3-scan-bucket"
            - "*.log"
            - "prefix1/"
    delete_s3_objects_on_read: true
      region: "us-east-1"
       sts_role_arn: "arn:aws:iam::1234567890:role/scan-role"

For more information about how to configure S3 scan options, see the list of configurable options in the s3 source documentation.

Filtering in sinks

Data Prepper 2.4.0 adds the include_keys and exclude_keys options for sinks, which gives you the flexibility to ingest data from any source and apply common enrichment using a processor chain. You can also selectively send data to a specific sink, like OpenSearch or S3, for archival purposes.

The following example shows how to implement filters inside a sink:

 - opensearch:
     - srcport
     - dstport
     - srcaddr
     - dstaddr
     - start
     - end
 - s3:
    - version
    - interfaceId

S3 sink codecs

Data Prepper 2.4.0 adds new codecs to the S3 sink:

  • JSON codec: The JSON codec writes an S3 object as a valid JSON object and includes all events sent to S3 in the JSON array. This is useful when integrating with systems that expect JSON objects.
  • Avro codec: The Apache Avro codec gives you the ability to define Avro schemas and write events in the Avro format.
  • Parquet codec: The Apache Parquet codec lets you use the Parquet columnar format. By saving your data in the Parquet format, you can efficiently retrieve that data at a later time for analysis. You can also define your schema in the Avro format inside the Parquet codec.

The following example shows the configuration of an Avro codec with a schema for network traffic:

      schema: >
          "type" : "record",
          "namespace" : "org.opensearch.dataprepper.examples",
          "name" : "NetworkTraffic",
          "fields" : [
            { "name" : "sourcePort", "type": "int"},
            { "name" : "destinationPort", "type": "int"},
            { "name" : "sourceAddress", "type" : "string"},
            { "name" : "destinationAddress", "type" : "string"},
            { "name" : "bytes", "type": "int"},

Additionally, the S3 sink can now write compressed gzip or Snappy files to reduce your network or storage needs. To use this setting, add the following to your pipeline:

  compression: gzip

Streaming anomaly detection with high cardinality

The streaming anomaly_detector now contains the identification_keys option, which creates a Random Cut Forest (RCF) model for each value in your time-series data. With the identification_keys option, anomalies can be detected in a unique set of keys.

The following example shows how to create an anomaly_detector processor that detects anomalies for each IP address:

- anomaly_detector:
   identification_keys: ["ip"]
   keys: ["latency"]

You can write anomalies detected by the processor to a separate index and create alerts using document-level monitors.

Furthermore, you can use the verbose configuration in the anomaly detector processor to change the number of alerts and anomalies shown by the processor: false for fewer, true for more.

OpenSearch sink with Elasticsearch 6.8

The opensearch sink can now write to Elasticsearch 6.8 by changing the distribution_version setting to es6.

Upcoming changes

Future versions of Data Prepper will include:

  • A geoip processor for extracting locations from IP addresses.
  • The ability to migrate from older versions of OpenSearch.
  • The ability to write to Amazon Simple Notification Service (Amazon SNS) using an SNS sink.

For a list of additional features we plan to include in future releases, see the project roadmap.

Getting started

Thanks to our contributors!