Anomaly detection
You can use Data Prepper to train models and generate anomalies in near real time on time-series aggregated events. You can generate anomalies either on events generated within the pipeline or on events coming directly into the pipeline, like OpenTelemetry metrics. You can feed these tumbling window aggregated time-series events to the anomaly_detector
processor, which trains a model and generates anomalies with a grade score. Then you can configure your pipeline to write the anomalies to a separate index to create document monitors and trigger fast alerting.
Metrics from logs
The following pipeline receives logs from an HTTP source like FluentBit, extracts important values from the logs by matching the value in the log
key against the Grok Apache Common Log Format, and then forwards the grokked logs to both the log-to-metrics-pipeline
pipeline and an OpenSearch index named logs
.
The log-to-metrics-pipeline
pipeline receives the grokked logs from the apache-log-pipeline-with-metrics
pipeline, aggregates them, and derives histogram metrics based on the values in the clientip
and request
keys. It then sends the histogram metrics to an OpenSearch index named histogram_metrics
as well as to the log-to-metrics-anomaly-detector-pipeline
pipeline.
The log-to-metrics-anomaly-detector-pipeline
pipeline receives the aggregated histogram metrics from the log-to-metrics-pipeline
pipeline and sends them to the anomaly_detector
processor to detect anomalies by using the Random Cut Forest algorithm. If the algorithm detects anomalies, it sends them to an OpenSearch index named log-metric-anomalies
.
apache-log-pipeline-with-metrics:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/apache-log-pipeline-with-metrics/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
sink:
- opensearch:
...
index: "logs"
- pipeline:
name: "log-to-metrics-pipeline"
log-to-metrics-pipeline:
source:
pipeline:
name: "apache-log-pipeline-with-metrics"
processor:
- aggregate:
# Specify the required identification keys
identification_keys: ["clientip", "request"]
action:
histogram:
# Specify the appropriate values for each the following fields
key: "bytes"
record_minmax: true
units: "bytes"
buckets: [0, 25000000, 50000000, 75000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "histogram_metrics"
- pipeline:
name: "log-to-metrics-anomaly-detector-pipeline"
log-to-metrics-anomaly-detector-pipeline:
source:
pipeline:
name: "log-to-metrics-pipeline"
processor:
- anomaly_detector:
# Specify the key on which to run anomaly detection
keys: [ "bytes" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: "log-metric-anomalies"
Metrics from traces
You can derive metrics from traces and find anomalies in those metrics. In this example, the entry-pipeline
pipeline receives trace data from the OpenTelemetry Collector and forwards it to the following pipelines:
-
span-pipeline
–- Extracts the raw spans from the traces. The pipeline sends the raw spans to any indexes OpenSearch prefixed withotel-v1-apm-span
. -
service-map-pipeline
–- Aggregates and analyzes the traces to create documents that represent connections between services. The pipeline sends these documents to an OpenSearch index namedotel-v1-apm-service-map
. You can then see a visualization of the service map through the Trace Analytics plugin for OpenSearch Dashboards. -
trace-to-metrics-pipeline
– Aggregates and derives histogram metrics from the traces based on the value of theserviceName
. The pipeline then sends the derived metrics to an OpenSearch index namedmetrics_for_traces
and to thetrace-to-metrics-anomaly-detector-pipeline
pipeline.
The trace-to-metrics-anomaly-detector-pipeline
pipeline receives the aggregated histogram metrics from the trace-to-metrics-pipeline
and sends them to the anomaly_detector
processor to detect anomalies by using the Random Cut Forest algorithm. If the algorithm detects any anomalies, it sends them to an OpenSearch index named trace-metric-anomalies
.
entry-pipeline:
source:
otel_trace_source:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/entry-pipeline/v1/traces". This will be endpoint URI path in OpenTelemetry Exporter
# configuration.
# path: "/${pipelineName}/v1/traces"
processor:
- trace_peer_forwarder:
sink:
- pipeline:
name: "span-pipeline"
- pipeline:
name: "service-map-pipeline"
- pipeline:
name: "trace-to-metrics-pipeline"
span-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- otel_trace_raw:
sink:
- opensearch:
...
index_type: "trace-analytics-raw"
service-map-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- service_map:
sink:
- opensearch:
...
index_type: "trace-analytics-service-map"
trace-to-metrics-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- aggregate:
# Pick the required identification keys
identification_keys: ["serviceName"]
action:
histogram:
# Pick the appropriate values for each the following fields
key: "durationInNanos"
record_minmax: true
units: "seconds"
buckets: [0, 10000000, 50000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "metrics_for_traces"
- pipeline:
name: "trace-to-metrics-anomaly-detector-pipeline"
trace-to-metrics-anomaly-detector-pipeline:
source:
pipeline:
name: "trace-to-metrics-pipeline"
processor:
- anomaly_detector:
# Below Key will find anomalies in the max value of histogram generated for durationInNanos.
keys: [ "max" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: "trace-metric-anomalies"
OpenTelemetry metrics
You can create a pipeline that receives OpenTelemetry metrics and detects anomalies in those metrics. In this example, entry-pipeline
receives metrics from the OpenTelemetry Collector. If a metric is of type GAUGE
and the name of the metric is totalApiBytesSent
, the processor sends it to the ad-pipeline
pipeline.
The ad-pipeline
pipeline receives the metrics from the entry pipeline and performs anomaly detection on the metric values by using the anomaly_detector
processor.
entry-pipeline:
source:
otel_metrics_source:
processor:
- otel_metrics:
route:
- gauge_route: '/kind = "GAUGE" and /name = "totalApiBytesSent"'
sink:
- pipeline:
name: "ad-pipeline"
routes:
- gauge_route
- opensearch:
...
index: "otel-metrics"
ad-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- anomaly_detector:
# Use "value" as the key on which anomaly detector needs to be run
keys: [ "value" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: otel-metrics-anomalies