15 - Filtres Aggregate et Metrics : correler les événements
Ce que tu vas apprendre
- Regrouper plusieurs événements en un seul avec le filtre aggregate
- Correler le début et la fin d'une transaction
- Calculer des metriques en temps réel avec le filtre metrics
- Comprendre les implications mémoire de ces filtres stateful
- Quand utiliser aggregate vs faire l'aggregation dans Elasticsearch
Prerequisites
- Maitriser les filtres de base (voir articles 09 a 14)
- Comprendre le modèle d'événement Logstash
Le problème : un événement != une transaction
Jusqu'ici, chaque ligne de log = un événement = un document dans Elasticsearch. Mais dans la vraie vie, une opération métier produit souvent plusieurs événements :
2026-03-31 14:23:01 [req-abc123] START POST /api/orders
2026-03-31 14:23:01 [req-abc123] AUTH user=42 role=admin
2026-03-31 14:23:02 [req-abc123] DB query=INSERT duration=340ms
2026-03-31 14:23:02 [req-abc123] RESPONSE status=201 duration=1024ms
Quatre lignes, une seule requête. Dans Elasticsearch, tu veux un seul document avec toutes les infos : la méthode, l'utilisateur, la duree de la DB, le status final et la duree totale.
C'est le job du filtre aggregate.
Le filtre Aggregate
Le concept
Aggregate maintient un "panier" (map) en mémoire, identifié par un ID de correlation (task_id). Chaque événement qui partage le meme task_id ajoute ses donnees dans le panier. Quand la transaction est terminee (ou qu'un timeout expire), le panier est emis comme un seul événement.
Evenement 1 (START) ──┐
Evenement 2 (AUTH) ──┤──> map[req-abc123] ──> Evenement agrege
Evenement 3 (DB) ──┤
Evenement 4 (RESPONSE)─┘
Configuration
filter {
# Parser les logs
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:ts} \[%{DATA:request_id}\] START %{WORD:method} %{URIPATH:url}",
"%{TIMESTAMP_ISO8601:ts} \[%{DATA:request_id}\] AUTH user=%{INT:user_id} role=%{WORD:role}",
"%{TIMESTAMP_ISO8601:ts} \[%{DATA:request_id}\] DB query=%{WORD:db_action} duration=%{INT:db_duration}ms",
"%{TIMESTAMP_ISO8601:ts} \[%{DATA:request_id}\] RESPONSE status=%{INT:status} duration=%{INT:total_duration}ms"
]
}
}
# Agreger par request_id
aggregate {
task_id => "%{request_id}"
code => '
# Initialiser le map au premier evenement
map["method"] ||= event.get("method")
map["url"] ||= event.get("url")
# Accumuler les donnees
map["user_id"] ||= event.get("user_id")
map["role"] ||= event.get("role")
map["db_action"] ||= event.get("db_action")
map["db_duration"] ||= event.get("db_duration")
map["status"] ||= event.get("status")
map["total_duration"] ||= event.get("total_duration")
'
# Quand emettre l'evenement agrege
push_map_as_event_on_timeout => true
timeout => 30
timeout_tags => ["_aggregatetimeout"]
# Ou emettre quand on a le RESPONSE
map_action => "create_or_update"
}
# Supprimer les evenements individuels (garder seulement l'agrege)
if [request_id] and "_aggregatetimeout" not in [tags] {
drop {}
}
}
Comment ca fonctionne, pas a pas
- Événement START arrive.
task_id = "req-abc123". Le map est créé.methodeturlsont stockes. - Événement AUTH arrive. Meme
task_id.user_idetrolesont ajoutes au map. - Événement DB arrive.
db_actionetdb_durationsont ajoutes. - Événement RESPONSE arrive.
statusettotal_durationsont ajoutes. - Timeout (30s) expire. Le map est emis comme un nouvel événement avec tous les champs.
- Les événements individuels sont supprimes par le
drop {}.
Résultat final dans Elasticsearch :
json{
"request_id": "req-abc123",
"method": "POST",
"url": "/api/orders",
"user_id": "42",
"role": "admin",
"db_action": "INSERT",
"db_duration": "340",
"status": "201",
"total_duration": "1024"
}
Un seul document avec toute la transaction.
Emettre sans attendre le timeout
Le timeout est un filet de sécurité. Si tu sais quand la transaction est terminee (le RESPONSE), tu peux emettre immédiatement :
filter {
aggregate {
task_id => "%{request_id}"
code => '
map["method"] ||= event.get("method")
map["url"] ||= event.get("url")
map["user_id"] ||= event.get("user_id")
map["status"] ||= event.get("status")
map["total_duration"] ||= event.get("total_duration")
# Si c'est le dernier evenement, emettre
if event.get("status")
event.set("aggregated", true)
map.each { |k, v| event.set(k, v) if v }
end
'
map_action => "create_or_update"
end_of_task => event.get("status") != nil
timeout => 30
}
# Garder seulement les evenements agreges
if ![aggregated] {
drop {}
}
}
end_of_task indique quand la transaction est terminee. Le map est nettoye et l'événement final est emis sans attendre le timeout.
Attention a la mémoire
Aggregate garde un map en mémoire pour chaque task_id actif. Si tu as 10 000 requêtes concurrentes, tu as 10 000 maps en mémoire. Avec un timeout de 30 secondes et 1000 requêtes/seconde, ca fait 30 000 maps.
Surveille la mémoire JVM de Logstash quand tu utilises aggregate. Et choisis un timeout raisonnable : assez long pour capturer toute la transaction, assez court pour libérer la mémoire.
La contrainte du single worker
Le filtre aggregate a besoin que tous les événements d'un meme task_id passent par le meme worker. Si tu as pipeline.workers: 4, les événements d'une meme requête peuvent etre repartis entre les workers, et aggregate ne les verra pas tous.
Solution : force pipeline.workers: 1 pour le pipeline qui utilise aggregate, ou utilise pipeline.ordered => true dans pipelines.yml.
yaml# pipelines.yml
- pipeline.id: aggregation
path.config: "/usr/share/logstash/pipeline/aggregate.conf"
pipeline.workers: 1
C'est une limitation importante. Un seul worker signifie que le pipeline ne peut pas paralleliser. Si le volume est eleve, fais l'aggregation dans Elasticsearch plutot que dans Logstash.
Le filtre Metrics
Le filtre metrics calcule des statistiques en temps réel sur les événements qui passent.
Comptage par intervalle
filter {
metrics {
meter => ["events"]
add_tag => ["metric"]
flush_interval => 30
}
}
output {
if "metric" in [tags] {
stdout { codec => rubydebug }
}
if "metric" not in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
}
Toutes les 30 secondes, metrics emet un événement avec le comptage :
json{
"events": {
"count": 1523,
"rate_1m": 50.76,
"rate_5m": 48.23,
"rate_15m": 45.12
},
"tags": ["metric"]
}
rate_1m est la moyenne mobile sur 1 minute (événements par seconde).
Timer : mesurer des durees
filter {
if [duration_ms] {
metrics {
timer => { "request_duration" => "%{duration_ms}" }
add_tag => ["metric"]
flush_interval => 60
percentiles => [50, 90, 95, 99]
}
}
}
Résultat toutes les 60 secondes :
json{
"request_duration": {
"count": 834,
"min": 2.0,
"max": 8934.0,
"mean": 156.3,
"p50": 89.0,
"p90": 423.0,
"p95": 1204.0,
"p99": 5023.0
}
}
Tu obtiens les percentiles, la moyenne, le min et le max. C'est du monitoring en temps réel, directement dans le pipeline.
Limites du filtre metrics
Metrics est stateful : il accumule des statistiques en mémoire entre les flush. Il est utile pour du monitoring du pipeline lui-meme, pas pour de l'analytics business. Pour les analytics, laisse Elasticsearch faire les aggregations (c'est son job).
Quand utiliser aggregate vs Elasticsearch
| Aggregate (Logstash) | Aggregations (Elasticsearch) | |
|---|---|---|
| Quand | Avant l'indexation | Apres l'indexation |
| Donnee stockee | 1 document par transaction | N documents par transaction |
| Stockage | Economique | Plus cher |
| Flexibilite | Fixe au moment du pipeline | Requetes ad-hoc |
| Performance pipeline | Limite (1 worker) | Pas d'impact |
| Cas d'usage | Deduplication, correlation | Dashboards, rapports |
Sur paltemps.fr, j'utilise aggregate pour un seul cas : deduplicer les événements de paiement (une transaction Stripe généré 3-4 webhooks, je ne veux qu'un seul document). Pour tout le reste, je stocke les événements individuels et j'utilise les aggregations Elasticsearch dans Kibana.
Résumé
- Le filtre aggregate regroupe plusieurs événements en un seul en utilisant un
task_idde correlation - Le
mapest un panier en mémoire qui accumule les champs de chaque événement push_map_as_event_on_timeoutemet l'événement agrege apres un delai- Aggregate nécessité
pipeline.workers: 1(pas de parallelisation) - Le filtre metrics calcule des compteurs et des percentiles en temps réel
- Pour les analytics, préféré les aggregations Elasticsearch (plus flexibles)
Precedent : 14 - Le filtre Ruby | Suivant : 16 - Conditionnels et contrôle de flux