16 - Conditionnels et contrôle de flux
Ce que tu vas apprendre
- La syntaxe if/else if/else dans un pipeline Logstash
- Tous les opérateurs de comparaison et les opérateurs logiques
- Router les événements par tag, type ou valeur de champ
- Utiliser
@metadatapour le routage interne - Le filtre drop pour supprimer des événements
- Structurer un pipeline multi-source
Prerequisites
- Connaitre les filtres de base (voir articles 09 a 15)
Sans conditionnels, un pipeline est lineaire
Tous les événements passent par tous les filtres, dans l'ordre. Ca marche quand tu n'as qu'une source de donnees. Mais quand tu recois des logs Nginx, des logs applicatifs et du syslog dans le meme pipeline, tu as besoin de les traiter differemment.
Les conditionnels sont partout dans mes pipelines en production. Pas juste dans les filtres, dans les outputs aussi. On envoie les erreurs vers un index spécifique, les metriques vers un autre, et on drop les health checks.
La syntaxe
if / else if / else
filter {
if [level] == "ERROR" {
mutate { add_tag => ["alert"] }
} else if [level] == "WARN" {
mutate { add_tag => ["warning"] }
} else {
mutate { add_tag => ["normal"] }
}
}
Les accolades sont obligatoires, meme pour une seule instruction. Pas de version sans accolades comme en JavaScript.
Dans les outputs
output {
if "alert" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-critical-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
}
Les événements avec le tag alert vont dans un index séparé. Les autres vont dans l'index standard.
Les opérateurs
Comparaison
| Operateur | Signification | Exemple |
|---|---|---|
== |
Egal | [status] == 200 |
!= |
Différent | [level] != "DEBUG" |
< |
Inferieur | [duration] < 100 |
> |
Superieur | [duration] > 5000 |
<= |
Inferieur ou egal | [status] <= 399 |
>= |
Superieur ou egal | [status] >= 500 |
=~ |
Matche une regex | [url] =~ /^\/api\// |
!~ |
Ne matche pas une regex | [url] !~ /\.(css|js|png)/ |
in |
Contenu dans | "error" in [tags] |
not in |
Non contenu dans | "debug" not in [tags] |
Operateurs logiques
| Operateur | Signification |
|---|---|
and |
ET logique |
or |
OU logique |
nand |
NON-ET |
xor |
OU exclusif |
! |
Negation |
filter {
if [level] == "ERROR" and [service] == "api-payments" {
mutate { add_field => { "alert_channel" => "pagerduty" } }
}
if [status] >= 500 or [duration] > 10000 {
mutate { add_tag => ["needs_review"] }
}
if !([geoip]) {
mutate { add_field => { "geo_status" => "not_resolved" } }
}
}
Tester l'existence d'un champ
# Le champ existe
if [user_id] {
# ...
}
# Le champ n'existe pas
if ![user_id] {
# ...
}
Attention : un champ qui existe mais contient une string vide "" est considéré comme existant. Pour tester la vacuite, utilise une regex :
if [user_id] and [user_id] != "" {
# le champ existe ET n'est pas vide
}
Champs imbriques
if [host][hostname] == "prod-api-01" {
# ...
}
if [geoip][country_code2] == "FR" {
# ...
}
if [@metadata][beat] == "filebeat" {
# ...
}
Router par type
Le champ type est un pattern classique pour le routage. Tu le définis dans l'input :
input {
beats {
port => 5044
type => "beats"
}
http {
port => 8080
codec => json
type => "http"
}
tcp {
port => 5000
codec => json_lines
type => "tcp"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
if [type] == "http" {
# Les donnees sont deja en JSON, juste convertir les types
mutate {
convert => { "status_code" => "integer" }
}
}
if [type] == "tcp" {
# Parser le format syslog
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{type}-%{+YYYY.MM.dd}"
}
}
Chaque source a son parsing, mais elles partagent le meme output. L'index inclut le type : logs-beats-2026.03.31, logs-http-2026.03.31, etc.
Router par tag
Les tags sont un tableau de strings. Tu les ajoutes dans les inputs ou les filtres, et tu les testes avec in :
filter {
# Grok ajoute _grokparsefailure si le parsing echoue
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# Les erreurs de parsing vont dans un index separe
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "[@metadata][target_index]" => "parse-errors" }
}
} else {
mutate {
add_field => { "[@metadata][target_index]" => "logs" }
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
}
}
Le filtre drop
drop supprime un événement du pipeline. Il ne passe pas aux filtres suivants et n'est pas envoye a l'output.
filter {
# Supprimer les health checks
if [url] == "/health" or [url] == "/ready" {
drop {}
}
# Supprimer les logs DEBUG en production
if [level] == "DEBUG" {
drop {}
}
# Supprimer les evenements sans request_id
if ![request_id] {
drop {}
}
}
drop est efficace parce qu'il libéré l'événement de la mémoire immédiatement. Mets les drops le plus tot possible dans le pipeline pour éviter de traiter des événements inutiles.
drop avec un pourcentage
Tu peux echantillonner les événements a fort volume :
filter {
if [level] == "DEBUG" {
drop { percentage => 90 }
}
}
90% des événements DEBUG sont supprimes, 10% sont gardes. Utile pour réduire le volume sans perdre complètement la visibilité.
Le pattern @metadata pour le routage
Le champ @metadata est invisible dans les outputs. C'est l'endroit ideal pour stocker des informations de routage :
filter {
# Definir l'index cible selon la source
if [type] == "nginx" {
mutate { add_field => { "[@metadata][target_index]" => "nginx" } }
} else if [type] == "app" {
mutate { add_field => { "[@metadata][target_index]" => "app" } }
} else {
mutate { add_field => { "[@metadata][target_index]" => "misc" } }
}
# Definir si l'evenement doit aller dans Kafka en plus d'ES
if [level] == "ERROR" {
mutate { add_field => { "[@metadata][send_to_kafka]" => "true" } }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
}
if [@metadata][send_to_kafka] == "true" {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "errors"
codec => json
}
}
}
Les champs @metadata ne se retrouvent pas dans Elasticsearch. Pas de pollution, pas de stockage inutile.
Structurer un pipeline multi-source
Voici le schema que j'utilise sur paltemps.fr pour un pipeline qui recoit de tout :
┌─────────────────────────────────────────────────────────────┐
│ Pipeline principal │
│ │
│ ┌───────┐ ┌────────────────┐ ┌──────────────────────┐ │
│ │ INPUT │──>│ CLASSIFICATION │──>│ PARSING PAR TYPE │ │
│ │ │ │ │ │ │ │
│ │ beats │ │ if beats → │ │ if nginx → grok │ │
│ │ http │ │ tag nginx │ │ if app → json │ │
│ │ tcp │ │ tag app │ │ if syslog → syslog │ │
│ │ │ │ if http → │ │ │ │
│ │ │ │ tag webhook │ │ │ │
│ └───────┘ └────────────────┘ └──────────┬───────────┘ │
│ │ │
│ ┌──────────────────┐ ┌────────────────────┐│ │
│ │ ENRICHISSEMENT │ │ ROUTAGE OUTPUT ││ │
│ │ │<──┘ ││ │
│ │ date │ │ if critical → ES ││ │
│ │ geoip │──>│ + Kafka ││ │
│ │ mutate cleanup │ │ if normal → ES ││ │
│ │ │ │ if parse_error → ││ │
│ │ │ │ fichier erreurs ││ │
│ └──────────────────┘ └────────────────────┘│ │
└─────────────────────────────────────────────────────────────┘
En code :
input {
beats { port => 5044 }
http { port => 8080; codec => json; type => "webhook" }
}
filter {
# 1. Classification
if [type] != "webhook" {
if "nginx" in [tags] {
mutate { add_field => { "[@metadata][source]" => "nginx" } }
} else {
mutate { add_field => { "[@metadata][source]" => "app" } }
}
}
# 2. Parsing par source
if [@metadata][source] == "nginx" {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
} else if [@metadata][source] == "app" {
json { source => "message" }
}
# 3. Enrichissement commun
date {
match => ["timestamp", "ISO8601", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
remove_field => ["timestamp"]
}
if [clientip] {
geoip { source => "clientip" }
}
# 4. Cleanup
mutate {
remove_field => ["agent", "ecs", "input", "log", "@version"]
}
# 5. Drop les health checks
if [url] in ["/health", "/ready", "/metrics"] {
drop {}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{[@metadata][source]}-%{+YYYY.MM.dd}"
}
}
Le pipeline suit un flux logique : classifier, parser, enrichir, nettoyer, router. Chaque étape utilise des conditionnels pour s'adapter a la source.
Résumé
- Les conditionnels
if/else if/elsecontrolent le flux dans les filtres et les outputs - Operateurs :
==,!=,<,>,=~(regex),in(tableaux),and,or,! - Route par
type(défini dans l'input) ou partags(ajoutes en cours de route) @metadataest le bon endroit pour les informations de routage (invisible dans l'output)drop {}supprime un événement du pipeline, a placer le plus tot possible- Un pipeline multi-source suit le flux : classifier -> parser -> enrichir -> nettoyer -> router
Precedent : 15 - Aggregate et Metrics | Suivant : 17 - Output Elasticsearch