Deriving metrics from logs
You can use Data Prepper to derive metrics from logs.
The following example pipeline receives incoming logs using the http
source plugin and the grok
processor. It then uses the aggregate
processor to extract the metric bytes aggregated during a 30-second window and derives histograms from the results.
This pipeline writes data to two different OpenSearch indexes:
logs
: This index stores the original, un-aggregated log events after being processed by thegrok
processor.histogram_metrics
: This index stores the derived histogram metrics extracted from the log events using theaggregate
processor.
The pipeline contains two sub-pipelines:
-
apache-log-pipeline-with-metrics
: Receives logs through an HTTP client like FluentBit, usinggrok
to extract important values from the logs by matching the value in the log key against the Apache Common Log Format. It then forwards the grokked logs to two destinations: - An OpenSearch index named
logs
to store the original log events. -
The
log-to-metrics-pipeline
for further aggregation and metric derivation. log-to-metrics-pipeline
: Receives the grokked logs from theapache-log-pipeline-with-metrics
pipeline, aggregates the logs, and derives histogram metrics of bytes based on the values in theclientip
andrequest
keys. Finally, it sends the derived histogram metrics to an OpenSearch index namedhistogram_metrics
.
Example pipeline
apache-log-pipeline-with-metrics:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/apache-log-pipeline-with-metrics/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
sink:
- opensearch:
...
index: "logs"
- pipeline:
name: "log-to-metrics-pipeline"
log-to-metrics-pipeline:
source:
pipeline:
name: "apache-log-pipeline-with-metrics"
processor:
- aggregate:
# Specify the required identification keys
identification_keys: ["clientip", "request"]
action:
histogram:
# Specify the appropriate values for each of the following fields
key: "bytes"
record_minmax: true
units: "bytes"
buckets: [0, 25000000, 50000000, 75000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "histogram_metrics"