Link Search Menu Expand Document Documentation Menu

opensearch

You can use the opensearch sink plugin to send data to an OpenSearch cluster, a legacy Elasticsearch cluster, or an Amazon OpenSearch Service domain.

The plugin supports OpenSearch 1.0 and later and Elasticsearch 7.3 and later.

Usage

To configure an opensearch sink, specify the opensearch option within the pipeline configuration:

pipeline:
  ...
  sink:
    opensearch:
      hosts: ["https://localhost:9200"]
      cert: path/to/cert
      username: YOUR_USERNAME
      password: YOUR_PASSWORD
      index_type: trace-analytics-raw
      dlq_file: /your/local/dlq-file
      max_retries: 20
      bulk_size: 4

To configure an Amazon OpenSearch Service sink, specify the domain endpoint as the hosts option, as shown in the following example:

pipeline:
  ...
  sink:
    opensearch:
      hosts: ["https://your-amazon-opensearch-service-endpoint"]
      aws_sigv4: true
      cert: path/to/cert
      insecure: false
      index_type: trace-analytics-service-map
      bulk_size: 4

Configuration options

The following table describes options you can configure for the opensearch sink.

Option Required Type Description
hosts Yes List A list of OpenSearch hosts to write to, such as ["https://localhost:9200", "https://remote-cluster:9200"].
cert No String The path to the security certificate. For example, "config/root-ca.pem" if the cluster uses the OpenSearch Security plugin.
username No String The username for HTTP basic authentication.
password No String The password for HTTP basic authentication.
aws No AWS The AWS configuration.
max_retries No Integer The maximum number of times that the opensearch sink should try to push data to the OpenSearch server before considering it to be a failure. Defaults to Integer.MAX_VALUE. When not provided, the sink will try to push data to the OpenSearch server indefinitely and exponential backoff will increase the waiting time before a retry.
aws_sigv4 No Boolean Deprecated in Data Prepper 2.7. Default is false. Whether to use AWS Identity and Access Management (IAM) signing to connect to an Amazon OpenSearch Service domain. For your access key, secret key, and optional session token, Data Prepper uses the default credential chain (environment variables, Java system properties, ~/.aws/credential).
aws_region No String Deprecated in Data Prepper 2.7. The AWS Region (for example, "us-east-1") for the domain when you are connecting to Amazon OpenSearch Service.
aws_sts_role_arn No String Deprecated in Data Prepper 2.7. The IAM role that the plugin uses to sign requests sent to Amazon OpenSearch Service. If this information is not provided, then the plugin uses the default credentials.
socket_timeout No Integer The timeout value, in milliseconds, when waiting for data to be returned (the maximum period of inactivity between two consecutive data packets). A timeout value of 0 is interpreted as an infinite timeout. If this timeout value is negative or not set, then the underlying Apache HttpClient will rely on operating system settings to manage socket timeouts.
connect_timeout No Integer The timeout value, in milliseconds, when requesting a connection from the connection manager. A timeout value of 0 is interpreted as an infinite timeout. If this timeout value is negative or not set, the underlying Apache HttpClient will rely on operating system settings to manage connection timeouts.
insecure No Boolean Whether or not to verify SSL certificates. If set to true, then certificate authority (CA) certificate verification is disabled and insecure HTTP requests are sent instead. Default is false.
proxy No String The address of the forward HTTP proxy server. The format is "<hostname or IP>:<port>" (for example, "example.com:8100", "http://example.com:8100", "112.112.112.112:8100"). The port number cannot be omitted.
index Conditionally String The name of the export index. Only required when the index_type is custom. The index can be a plain string, such as my-index-name, contain Java date-time patterns, such as my-index-${yyyy.MM.dd} or my-${yyyy-MM-dd-HH}-index, be formatted using field values, such as my-index-${/my_field}, or use Data Prepper expressions, such as my-index-${getMetadata(\"my_metadata_field\"}. All formatting options can be combined to provide flexibility when creating static, dynamic, and rolling indexes.
index_type No String Tells the sink plugin what type of data it is handling. Valid values are custom, trace-analytics-raw, trace-analytics-service-map, or management-disabled. Default is custom.
template_type No String Defines what type of OpenSearch template to use. Available options are v1 and index-template. The default value is v1, which uses the original OpenSearch templates available at the _template API endpoints. The index-template option uses composable index templates, which are available through the OpenSearch _index_template API. Composable index types offer more flexibility than the default and are necessary when an OpenSearch cluster contains existing index templates. Composable templates are available for all versions of OpenSearch and some later versions of Elasticsearch. When distribution_version is set to es6, Data Prepper enforces the template_type as v1.
template_file No String The path to a JSON index template file, such as /your/local/template-file.json, when index_type is set to custom. For an example template file, see otel-v1-apm-span-index-template.json. If you supply a template file, then it must match the template format specified by the template_type parameter.
template_content No JSON Contains all the inline JSON found inside of the index index template. For an example of template content, see the example template content.
document_id_field No String Deprecated in Data Prepper 2.7 in favor of document_id. The field from the source data to use for the OpenSearch document ID (for example, "my-field") if index_type is custom.
document_id No String A format string to use as the _id in OpenSearch documents. To specify a single field in an event, use ${/my_field}. You can also use Data Prepper expressions to construct the document_id, for example, ${getMetadata(\"some_metadata_key\")}. These options can be combined into more complex formats, such as ${/my_field}-test-${getMetadata(\"some_metadata_key\")}.
document_version No String A format string to use as the _version in OpenSearch documents. To specify a single field in an event, use ${/my_field}. You can also use Data Prepper expressions to construct the document_version, for example, ${getMetadata(\"some_metadata_key\")}. These options can be combined into more complex versions, such as ${/my_field}${getMetadata(\"some_metadata_key\")}. The document_version format must evaluate to a long type and can only be used when document_version_type is set to either external or external_gte.
document_version_type No String The document version type for index operations. Must be one of external, external_gte, or internal. If set to external or external_gte, then document_version is required.
dlq_file No String The path to your preferred dead letter queue file (such as /your/local/dlq-file). Data Prepper writes to this file when it fails to index a document on the OpenSearch cluster.
dlq No N/A DLQ configurations.
bulk_size No Integer (long) The maximum size (in MiB) of bulk requests sent to the OpenSearch cluster. Values below 0 indicate an unlimited size. If a single document exceeds the maximum bulk request size, then Data Prepper sends each request individually. Default value is 5.
ism_policy_file No String The absolute file path for an Index State Management (ISM) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, the custom index type is currently the only type without a built-in policy file, so it will use this policy file if it is provided through this parameter. For more information about the policy JSON file, see ISM policies.
number_of_shards No Integer The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when template_file is either explicitly provided in the sink configuration or built in. If this parameter is set, then it will override the value in the index template file. For more information, see Create index.
number_of_replicas No Integer The number of replica shards that each primary shard should have on the destination OpenSearch server. For example, if you have 4 primary shards and set number_of_replicas to 3, then the index has 12 replica shards. This parameter is effective only when template_file is either explicitly provided in the sink configuration or built in. If this parameter is set, then it will override the value in the index template file. For more information, see Create index.
distribution_version No String Indicates whether the backend version of the sink is Elasticsearch 6 or later. es6 represents Elasticsearch 6. default represents the latest compatible backend version, such as Elasticsearch 7.x, OpenSearch 1.x, or OpenSearch 2.x. Default is default.
enable_request_compression No Boolean Whether to enable compression when sending requests to OpenSearch. When distribution_version is set to es6, default is false. For all other distribution versions, default is true.
action No String The OpenSearch bulk action to use for documents. Must be one of create, index, update, upsert, or delete. Default is index.
actions No List A list of actions that can be used as an alternative to action, which reads as a switch case statement that conditionally determines the bulk action to take for an event.
flush_timeout No Long A long class that contains the amount of time, in milliseconds, to try packing a bulk request up to the bulk_size before flushing the request. If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed. Set to -1 to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or 1 minute.
normalize_index No Boolean If true, then the OpenSearch sink will try to create dynamic index names. Index names with format options specified in ${}) are valid according to the index naming restrictions. Any invalid characters will be removed. Default value is false.
routing No String A string used as a hash for generating the shard_id for a document when it is stored in OpenSearch. Each incoming record is searched. When present, the string is used as the routing field for the document. When not present, the default routing mechanism (document_id) is used by OpenSearch when storing the document. Supports formatting with fields in events and Data Prepper expressions, such as ${/my_field}-test-${getMetadata(\"some_metadata_key\")}.
document_root_key No String The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist, then the entire event is written as the document. If document_root_key is of a basic value type, such as a string or integer, then the document will have a structure of {"data": <value of the document_root_key>}.
serverless No Boolean Deprecated in Data Prepper 2.7. Use this option with the aws configuration instead. Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to true when the destination for the opensearch sink is an Amazon OpenSearch Serverless collection. Default is false.
serverless_options No Object Deprecated in Data Prepper 2.7. Use this option with the aws configuration instead. The network configuration options available when the backend of the opensearch sink is set to Amazon OpenSearch Serverless. For more information, see Serverless options.

aws

Option Required Type Description
region No String The AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region.
sts_role_arn No String The AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null, which will use standard SDK behavior for credentials.
sts_header_overrides No Map A map of header overrides that the IAM role assumes for the sink plugin.
sts_external_id No String The external ID to attach to AssumeRole requests from AWS STS.
serverless No Boolean Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to true when the destination for the opensearch sink is an Amazon OpenSearch Serverless collection. Default is false.
serverless_options No Object The network configuration options available when the backend of the opensearch sink is set to Amazon OpenSearch Serverless. For more information, see Serverless options.

actions

The following options can be used inside the actions option.

Option Required Type Description
type Yes String The type of bulk action to use if the when condition evaluates to true. Must be either create, index, update, upsert, or delete.
when No String A Data Prepper expression that conditionally evaluates whether an event will be sent to OpenSearch using the bulk action configured in type. When empty, the bulk action will be chosen automatically when the event is sent to OpenSearch.

Serverless options

The following options can be used in the serverless_options object.

Option Required Type Description
network_policy_name Yes String The name of the network policy to create.
collection_name Yes String The name of the Amazon OpenSearch Serverless collection to configure.
vpce_id Yes String The virtual private cloud (VPC) endpoint to which the source connects.

Configure max_retries

You can include the max_retries option in your pipeline configuration to control the number of times the source tries to write to sinks with exponential backoff. If you don’t include this option, pipelines keep retrying forever.

If you specify max_retries and a pipeline has a dead-letter queue (DLQ) configured, the pipeline will keep trying to write to sinks until it reaches the maximum number of retries, at which point it starts to send failed data to the DLQ.

If you don’t specify max_retries, only data that is rejected by sinks is written to the DLQ. Pipelines continue to try to write all other data to the sinks.

OpenSearch cluster security

In order to send data to an OpenSearch cluster using the opensearch sink plugin, you must specify your username and password within the pipeline configuration. The following example pipelines.yaml file demonstrates how to specify admin security credentials:

sink:
  - opensearch:
      username: "admin"
      password: "admin"
      ...

Alternately, rather than admin credentials, you can specify the credentials of a user mapped to a role with the minimum permissions listed in the following sections.

Cluster permissions

  • cluster_all
  • indices:admin/template/get
  • indices:admin/template/put

Index permissions

  • Index: otel-v1*; Index permission: indices_all
  • Index: .opendistro-ism-config; Index permission: indices_all
  • Index: *; Index permission: manage_aliases

For instructions on how to map users to roles, see Map users to roles.

Amazon OpenSearch Service domain security

The opensearch sink plugin can send data to an Amazon OpenSearch Service domain, which uses IAM for security. The plugin uses the default credential chain. Run aws configure using the AWS Command Line Interface (AWS CLI) to set your credentials.

Make sure the credentials that you configure have the required IAM permissions. The following domain access policy demonstrates the minimum required permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AccountId>:user/data-prepper-user"
      },
      "Action": "es:ESHttp*",
      "Resource": [
        "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/otel-v1*",
        "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/_template/otel-v1*",
        "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/_plugins/_ism/policies/raw-span-policy",
        "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/_alias/otel-v1*",
        "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/_alias/_bulk"
      ]
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AccountId>:user/data-prepper-user"
      },
      "Action": "es:ESHttpGet",
      "Resource": "arn:aws:es:us-east-1:<AccountId>:domain/<domain-name>/_cluster/settings"
    }
  ]
}

For instructions on how to configure the domain access policy, see Resource-based policies in the Amazon OpenSearch Service documentation.

Fine-grained access control

If your OpenSearch Service domain uses fine-grained access control, the opensearch sink plugin requires some additional configuration.

IAM ARN as master user

If you’re using an IAM Amazon Resource Name (ARN) as the master user, include the aws_sigv4 option in your sink configuration:

...
sink:
    opensearch:
      hosts: ["https://your-fgac-amazon-opensearch-service-endpoint"]
      aws_sigv4: true

Run aws configure using the AWS CLI to use the master IAM user credentials. If you don’t want to use the master user, you can specify a different IAM role using the aws_sts_role_arn option. The plugin will then use this role to sign requests sent to the domain sink. The ARN that you specify must be included in the domain access policy.

Master user in the internal user database

If your domain uses a master user in the internal user database, specify the master username and password as well as the aws_sigv4 option:

sink:
    opensearch:
      hosts: ["https://your-fgac-amazon-opensearch-service-endpoint"]
      username: "master-username"
      password: "master-password"

For more information, see Recommended configurations in the Amazon OpenSearch Service documentation.

Note: You can create a new IAM role or internal user database user with the all_access permission and use it instead of the master user.

OpenSearch Serverless collection security

The opensearch sink plugin can send data to an Amazon OpenSearch Serverless collection.

OpenSearch Serverless collection sinks have the following limitations:

  • You can’t write to a collection that uses virtual private cloud (VPC) access. The collection must be accessible from public networks.
  • The OTel trace group processor doesn’t currently support collection sinks.

Creating a pipeline role

First, create an IAM role that the pipeline will assume in order to write to the collection. The role must have the following minimum permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "aoss:BatchGetCollection"
            ],
            "Resource": "*"
        }
    ]
}

The role must have the following trust relationship, which allows the pipeline to assume it:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<AccountId>:root"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Creating a collection

Next, create a collection with the following settings:

  • Public network access to both the OpenSearch endpoint and OpenSearch Dashboards.
  • The following data access policy, which grants the required permissions to the pipeline role:

    [
     {
        "Rules":[
           {
              "Resource":[
                 "index/collection-name/*"
              ],
              "Permission":[
                 "aoss:CreateIndex",
                 "aoss:UpdateIndex",
                 "aoss:DescribeIndex",
                 "aoss:WriteDocument"
              ],
              "ResourceType":"index"
           }
        ],
        "Principal":[
           "arn:aws:iam::<AccountId>:role/PipelineRole"
        ],
        "Description":"Pipeline role access"
     }
    ]
    

    Important: Make sure to replace the ARN in the Principal element with the ARN of the pipeline role that you created in the preceding step.

    For instructions on how to create collections, see Creating collections in the Amazon OpenSearch Service documentation.

Creating a pipeline

Within your pipelines.yaml file, specify the OpenSearch Serverless collection endpoint as the hosts option. In addition, you must set the serverless option to true. Specify the pipeline role in the sts_role_arn option:

log-pipeline:
  source:
    http:
  processor:
    - date:
        from_time_received: true
        destination: "@timestamp"
  sink:
    - opensearch:
        hosts: [ "https://<serverless-public-collection-endpoint>" ]
        index: "my-serverless-index"
        aws:
          serverless: true
          sts_role_arn: "arn:aws:iam::<AccountId>:role/PipelineRole"
          region: "us-east-1"

Example with template_content and actions

The following example pipeline contains both template_content and a list of conditional actions:

log-pipeline:
  source:
    http:
  processor:
    - date:
        from_time_received: true
        destination: "@timestamp"
  sink:
    - opensearch:
        hosts: [ "https://<serverless-public-collection-endpoint>" ]
        index: "my-serverless-index"
        template_type: index-template
        template_content: >
          {
            "template" : {
              "mappings" : {
                "properties" : {
                  "Data" : {
                    "type" : "binary"
                  },
                  "EncodedColors" : {
                    "type" : "binary"
                  },
                  "Type" : {
                    "type" : "keyword"
                  },
                  "LargeDouble" : {
                    "type" : "double"
                  }          
                }
              }
            }
          }
        # index is the default case  
        actions:
         - type: "delete"
           when: '/operation == "delete"'
         - type: "update"
           when: '/operation == "update"'
         - type: "index"
        aws:
          sts_role_arn: "arn:aws:iam::<AccountId>:role/PipelineRole"
          region: "us-east-1"