Logstash pour les devs - 16 - Conditionnels et contrôle de flux

if/else, opérateurs, tags, types et expressions regulieres : contrôler ce que chaque événement traverse dans un pipeline.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

16 - Conditionnels et contrôle de flux

Ce que tu vas apprendre

  • La syntaxe if/else if/else dans un pipeline Logstash
  • Tous les opérateurs de comparaison et les opérateurs logiques
  • Router les événements par tag, type ou valeur de champ
  • Utiliser @metadata pour le routage interne
  • Le filtre drop pour supprimer des événements
  • Structurer un pipeline multi-source

Prerequisites


Sans conditionnels, un pipeline est lineaire

Tous les événements passent par tous les filtres, dans l'ordre. Ca marche quand tu n'as qu'une source de donnees. Mais quand tu recois des logs Nginx, des logs applicatifs et du syslog dans le meme pipeline, tu as besoin de les traiter differemment.

Les conditionnels sont partout dans mes pipelines en production. Pas juste dans les filtres, dans les outputs aussi. On envoie les erreurs vers un index spécifique, les metriques vers un autre, et on drop les health checks.

La syntaxe

if / else if / else

filter {
  if [level] == "ERROR" {
    mutate { add_tag => ["alert"] }
  } else if [level] == "WARN" {
    mutate { add_tag => ["warning"] }
  } else {
    mutate { add_tag => ["normal"] }
  }
}

Les accolades sont obligatoires, meme pour une seule instruction. Pas de version sans accolades comme en JavaScript.

Dans les outputs

output {
  if "alert" in [tags] {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "logs-critical-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "logs-%{+YYYY.MM.dd}"
    }
  }
}

Les événements avec le tag alert vont dans un index séparé. Les autres vont dans l'index standard.

Les opérateurs

Comparaison

Operateur Signification Exemple
== Egal [status] == 200
!= Différent [level] != "DEBUG"
< Inferieur [duration] < 100
> Superieur [duration] > 5000
<= Inferieur ou egal [status] <= 399
>= Superieur ou egal [status] >= 500
=~ Matche une regex [url] =~ /^\/api\//
!~ Ne matche pas une regex [url] !~ /\.(css|js|png)/
in Contenu dans "error" in [tags]
not in Non contenu dans "debug" not in [tags]

Operateurs logiques

Operateur Signification
and ET logique
or OU logique
nand NON-ET
xor OU exclusif
! Negation
filter {
  if [level] == "ERROR" and [service] == "api-payments" {
    mutate { add_field => { "alert_channel" => "pagerduty" } }
  }

  if [status] >= 500 or [duration] > 10000 {
    mutate { add_tag => ["needs_review"] }
  }

  if !([geoip]) {
    mutate { add_field => { "geo_status" => "not_resolved" } }
  }
}

Tester l'existence d'un champ

# Le champ existe
if [user_id] {
  # ...
}

# Le champ n'existe pas
if ![user_id] {
  # ...
}

Attention : un champ qui existe mais contient une string vide "" est considéré comme existant. Pour tester la vacuite, utilise une regex :

if [user_id] and [user_id] != "" {
  # le champ existe ET n'est pas vide
}

Champs imbriques

if [host][hostname] == "prod-api-01" {
  # ...
}

if [geoip][country_code2] == "FR" {
  # ...
}

if [@metadata][beat] == "filebeat" {
  # ...
}

Router par type

Le champ type est un pattern classique pour le routage. Tu le définis dans l'input :

input {
  beats {
    port => 5044
    type => "beats"
  }

  http {
    port => 8080
    codec => json
    type => "http"
  }

  tcp {
    port => 5000
    codec => json_lines
    type => "tcp"
  }
}

filter {
  if [type] == "beats" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }

  if [type] == "http" {
    # Les donnees sont deja en JSON, juste convertir les types
    mutate {
      convert => { "status_code" => "integer" }
    }
  }

  if [type] == "tcp" {
    # Parser le format syslog
    grok {
      match => { "message" => "%{SYSLOGLINE}" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{type}-%{+YYYY.MM.dd}"
  }
}

Chaque source a son parsing, mais elles partagent le meme output. L'index inclut le type : logs-beats-2026.03.31, logs-http-2026.03.31, etc.

Router par tag

Les tags sont un tableau de strings. Tu les ajoutes dans les inputs ou les filtres, et tu les testes avec in :

filter {
  # Grok ajoute _grokparsefailure si le parsing echoue
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }

  # Les erreurs de parsing vont dans un index separe
  if "_grokparsefailure" in [tags] {
    mutate {
      add_field => { "[@metadata][target_index]" => "parse-errors" }
    }
  } else {
    mutate {
      add_field => { "[@metadata][target_index]" => "logs" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
  }
}

Le filtre drop

drop supprime un événement du pipeline. Il ne passe pas aux filtres suivants et n'est pas envoye a l'output.

filter {
  # Supprimer les health checks
  if [url] == "/health" or [url] == "/ready" {
    drop {}
  }

  # Supprimer les logs DEBUG en production
  if [level] == "DEBUG" {
    drop {}
  }

  # Supprimer les evenements sans request_id
  if ![request_id] {
    drop {}
  }
}

drop est efficace parce qu'il libéré l'événement de la mémoire immédiatement. Mets les drops le plus tot possible dans le pipeline pour éviter de traiter des événements inutiles.

drop avec un pourcentage

Tu peux echantillonner les événements a fort volume :

filter {
  if [level] == "DEBUG" {
    drop { percentage => 90 }
  }
}

90% des événements DEBUG sont supprimes, 10% sont gardes. Utile pour réduire le volume sans perdre complètement la visibilité.

Le pattern @metadata pour le routage

Le champ @metadata est invisible dans les outputs. C'est l'endroit ideal pour stocker des informations de routage :

filter {
  # Definir l'index cible selon la source
  if [type] == "nginx" {
    mutate { add_field => { "[@metadata][target_index]" => "nginx" } }
  } else if [type] == "app" {
    mutate { add_field => { "[@metadata][target_index]" => "app" } }
  } else {
    mutate { add_field => { "[@metadata][target_index]" => "misc" } }
  }

  # Definir si l'evenement doit aller dans Kafka en plus d'ES
  if [level] == "ERROR" {
    mutate { add_field => { "[@metadata][send_to_kafka]" => "true" } }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
  }

  if [@metadata][send_to_kafka] == "true" {
    kafka {
      bootstrap_servers => "kafka:9092"
      topic_id => "errors"
      codec => json
    }
  }
}

Les champs @metadata ne se retrouvent pas dans Elasticsearch. Pas de pollution, pas de stockage inutile.

Structurer un pipeline multi-source

Voici le schema que j'utilise sur paltemps.fr pour un pipeline qui recoit de tout :

┌─────────────────────────────────────────────────────────────┐
│                    Pipeline principal                        │
│                                                             │
│  ┌───────┐   ┌────────────────┐   ┌──────────────────────┐ │
│  │ INPUT │──>│ CLASSIFICATION │──>│ PARSING PAR TYPE     │ │
│  │       │   │                │   │                      │ │
│  │ beats │   │ if beats →     │   │ if nginx → grok      │ │
│  │ http  │   │   tag nginx    │   │ if app → json        │ │
│  │ tcp   │   │   tag app      │   │ if syslog → syslog   │ │
│  │       │   │ if http →      │   │                      │ │
│  │       │   │   tag webhook  │   │                      │ │
│  └───────┘   └────────────────┘   └──────────┬───────────┘ │
│                                               │             │
│  ┌──────────────────┐   ┌────────────────────┐│             │
│  │ ENRICHISSEMENT   │   │ ROUTAGE OUTPUT     ││             │
│  │                  │<──┘                    ││             │
│  │ date             │   │ if critical → ES   ││             │
│  │ geoip            │──>│   + Kafka          ││             │
│  │ mutate cleanup   │   │ if normal → ES     ││             │
│  │                  │   │ if parse_error →   ││             │
│  │                  │   │   fichier erreurs  ││             │
│  └──────────────────┘   └────────────────────┘│             │
└─────────────────────────────────────────────────────────────┘

En code :

input {
  beats { port => 5044 }
  http { port => 8080; codec => json; type => "webhook" }
}

filter {
  # 1. Classification
  if [type] != "webhook" {
    if "nginx" in [tags] {
      mutate { add_field => { "[@metadata][source]" => "nginx" } }
    } else {
      mutate { add_field => { "[@metadata][source]" => "app" } }
    }
  }

  # 2. Parsing par source
  if [@metadata][source] == "nginx" {
    grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
  } else if [@metadata][source] == "app" {
    json { source => "message" }
  }

  # 3. Enrichissement commun
  date {
    match => ["timestamp", "ISO8601", "dd/MMM/yyyy:HH:mm:ss Z"]
    target => "@timestamp"
    remove_field => ["timestamp"]
  }

  if [clientip] {
    geoip { source => "clientip" }
  }

  # 4. Cleanup
  mutate {
    remove_field => ["agent", "ecs", "input", "log", "@version"]
  }

  # 5. Drop les health checks
  if [url] in ["/health", "/ready", "/metrics"] {
    drop {}
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[@metadata][source]}-%{+YYYY.MM.dd}"
  }
}

Le pipeline suit un flux logique : classifier, parser, enrichir, nettoyer, router. Chaque étape utilise des conditionnels pour s'adapter a la source.

Résumé

  • Les conditionnels if/else if/else controlent le flux dans les filtres et les outputs
  • Operateurs : ==, !=, <, >, =~ (regex), in (tableaux), and, or, !
  • Route par type (défini dans l'input) ou par tags (ajoutes en cours de route)
  • @metadata est le bon endroit pour les informations de routage (invisible dans l'output)
  • drop {} supprime un événement du pipeline, a placer le plus tot possible
  • Un pipeline multi-source suit le flux : classifier -> parser -> enrichir -> nettoyer -> router

Precedent : 15 - Aggregate et Metrics | Suivant : 17 - Output Elasticsearch

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.