Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash

Input, filter, output : comprendre les trois blocs d'un pipeline Logstash avec un exemple concret de bout en bout.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

03 - Anatomie d'un pipeline Logstash

Ce que tu vas apprendre

  • La structure d'un fichier .conf (input, filter, output)
  • Le modèle d'événement Logstash (champs, @timestamp, @metadata)
  • Le rôle de la queue interne entre les blocs
  • Comment un événement traverse le pipeline de bout en bout
  • Écrire un premier pipeline réel qui parse un log

Prerequisites

  • Avoir le lab Docker qui tourne (voir article 02)

Le pipeline, c'est trois blocs

Quand j'explique Logstash a un junior, je commence toujours par la meme analogie : c'est une usine avec trois postes.

Le premier poste recoit la matière première. Le deuxieme la transforme. Le troisieme l'emballe et l'envoie. C'est input, filter, output.

Donnees brutes                  Donnees propres
     │                               │
     ▼                               ▼
┌─────────┐    ┌──────────┐    ┌──────────┐
│         │    │          │    │          │
│  INPUT  │───>│  FILTER  │───>│  OUTPUT  │
│         │    │          │    │          │
│ lit les │    │ parse    │    │ envoie   │
│ donnees │    │ enrichit │    │ les      │
│         │    │ nettoie  │    │ donnees  │
└─────────┘    └──────────┘    └──────────┘
     │              │               │
     │         (optionnel)          │
     │                              │
     └──────────────────────────────┘
           si pas de filtre,
         l'input va direct a l'output

Le bloc filter est optionnel. Tu peux envoyer des donnees d'un input a un output sans rien transformer. C'est ce qu'on a fait dans l'article 02 avec stdin -> stdout.

Le fichier .conf

Un pipeline Logstash se definit dans un fichier .conf. La syntaxe n'est ni du JSON, ni du YAML, ni du TOML. C'est un format propre a Logstash.

# logstash/pipeline/main.conf

input {
  # un ou plusieurs plugins d'entree
}

filter {
  # zero ou plusieurs plugins de transformation
}

output {
  # un ou plusieurs plugins de sortie
}

Chaque bloc contient un ou plusieurs plugins. Un plugin a un nom et des paramètres entre accolades :

input {
  file {
    path => "/data/app.log"
    start_position => "beginning"
  }
}

La syntaxe des paramètres : cle => valeur. Les chaînes sont entre guillemets doubles. Les nombres et booleens sont nus. Les listes sont entre crochets.

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    tag_on_failure => ["_grokparsefailure", "_parseerror"]
    add_field => { "parsed" => "true" }
  }
}

Le modèle d'événement

Tout ce qui traverse un pipeline Logstash est un événement. Un événement, c'est un objet avec des champs. Quand tu envoies la ligne de texte 2026-03-31 ERROR Connection refused a Logstash, il créé un événement qui ressemble a ca :

json{
  "@timestamp": "2026-03-31T10:15:42.123Z",
  "@version": "1",
  "message": "2026-03-31 ERROR Connection refused",
  "host": {
    "hostname": "logstash"
  },
  "event": {
    "original": "2026-03-31 ERROR Connection refused"
  }
}

Les champs qui commencent par @ sont des champs système :

  • @timestamp : le moment ou l'événement a ete créé (ou parse). C'est le champ que Kibana utilise pour la timeline.
  • @version : toujours "1". Un vestige historique.

Le champ message contient les donnees brutes. C'est sur ce champ que les filtres comme Grok vont travailler pour en extraire des informations structurees.

Champs imbriques

Les champs peuvent etre imbriques. On y accede avec la notation entre crochets :

# Acceder au champ hostname dans l'objet host
[host][hostname]

# Acceder a un champ de premier niveau
[message]

# Creer un champ imbrique dans un filtre
add_field => { "[geo][country]" => "France" }

Cette notation est partout dans Logstash. Dans les filtres, les conditionnels, les outputs. Il faut s'y habituer.

Le champ @metadata

Il existe un champ special : @metadata. C'est un objet invisible. Les champs stockes dans @metadata sont disponibles dans tout le pipeline, mais ils ne sont jamais envoyes a l'output.

filter {
  mutate {
    add_field => { "[@metadata][target_index]" => "logs-app" }
  }
}

output {
  elasticsearch {
    index => "%{[@metadata][target_index]}"
  }
}

L'index sera logs-app, mais le champ [@metadata][target_index] n'apparaitra pas dans le document Elasticsearch. C'est pratique pour stocker des informations de routage sans polluer les donnees.

La queue interne

Entre l'input et les workers de filtre/output, il y a une queue. Par défaut, c'est une queue en mémoire.

                          ┌─────────────────────────────────┐
                          │         Pipeline workers        │
                          │                                 │
┌─────────┐    ┌────────┐ │  ┌────────┐    ┌────────┐      │
│         │    │        │ │  │        │    │        │      │
│  INPUT  │───>│ QUEUE  │─┼─>│ FILTER │───>│ OUTPUT │      │
│         │    │        │ │  │        │    │        │      │
└─────────┘    │        │ │  └────────┘    └────────┘      │
               │        │ │                                 │
               │        │ │  ┌────────┐    ┌────────┐      │
               │        │─┼─>│ FILTER │───>│ OUTPUT │      │
               │        │ │  │        │    │        │      │
               └────────┘ │  └────────┘    └────────┘      │
                          │   (worker 1)    (worker 2)      │
                          └─────────────────────────────────┘

L'input pousse les événements dans la queue. Les workers (nombre défini par pipeline.workers) prennent des lots d'événements (taille définie par pipeline.batch.size) et les font passer par les filtres puis l'output.

Deux types de queue :

Type Avantages Inconvenients
memory (défaut) Rapide, zero config Perd les donnees si Logstash crash
persisted Survit aux crashes Plus lent, nécessité du disque

On verra la persistent queue dans l'article 21 sur la performance. Pour le dev, la queue en mémoire suffit.

Un pipeline réel : parser un log applicatif

Assez de theorie. Voici un cas concret. Tu as une application qui écrit des logs comme ca :

2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms
2026-03-31 14:23:01 ERROR [api-users] POST /users 500 5023ms Connection refused
2026-03-31 14:23:02 WARN  [api-orders] GET /orders?page=2 200 340ms

On veut extraire : la date, le niveau de log, le service, la méthode HTTP, l'URL, le code de réponse et la duree.

Étape 1 : créer le fichier de test

bash# data/sample.log

2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms
2026-03-31 14:23:01 ERROR [api-users] POST /users 500 5023ms Connection refused
2026-03-31 14:23:02 WARN  [api-orders] GET /orders?page=2 200 340ms
2026-03-31 14:23:03 INFO  [api-users] DELETE /users/99 204 8ms
2026-03-31 14:23:03 DEBUG [api-auth] POST /auth/login 200 156ms

Étape 2 : écrire le pipeline

# logstash/pipeline/main.conf

input {
  file {
    path => "/data/sample.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  # Parser la ligne de log
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:level}\s+\[%{DATA:service}\] %{WORD:http_method} %{URIPATH:url} %{INT:status_code} %{INT:duration}ms(?:\s%{GREEDYDATA:error_message})?"
    }
  }

  # Convertir le timestamp du log en @timestamp
  date {
    match => ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]
    target => "@timestamp"
    remove_field => ["log_timestamp"]
  }

  # Convertir les types
  mutate {
    convert => {
      "status_code" => "integer"
      "duration" => "integer"
    }
  }

  # Supprimer les champs inutiles
  mutate {
    remove_field => ["event", "host", "@version"]
  }
}

output {
  # Afficher dans la console pour voir le resultat
  stdout {
    codec => rubydebug
  }

  # Envoyer dans Elasticsearch
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-app-%{+YYYY.MM.dd}"
  }
}

Sauvegarde le fichier. Logstash le recharge en 3 secondes.

Étape 3 : comprendre ce qui se passe

Prenons la première ligne du log et suivons-la à travers le pipeline :

Entree (input file) : Logstash lit la ligne brute.

json{
  "@timestamp": "2026-03-31T12:30:00.000Z",
  "message": "2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms"
}

Apres Grok : le filtre Grok extrait les champs avec la regex.

json{
  "@timestamp": "2026-03-31T12:30:00.000Z",
  "message": "2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms",
  "log_timestamp": "2026-03-31 14:23:01",
  "level": "INFO",
  "service": "api-users",
  "http_method": "GET",
  "url": "/users/42",
  "status_code": "200",
  "duration": "12"
}

Apres Date : le filtre date remplace @timestamp par la vraie date du log.

json{
  "@timestamp": "2026-03-31T14:23:01.000Z",
  "message": "2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms",
  "level": "INFO",
  "service": "api-users",
  "http_method": "GET",
  "url": "/users/42",
  "status_code": "200",
  "duration": "12"
}

log_timestamp a ete supprime par remove_field.

Apres Mutate : les champs numériques sont convertis et les champs inutiles supprimes.

json{
  "@timestamp": "2026-03-31T14:23:01.000Z",
  "message": "2026-03-31 14:23:01 INFO  [api-users] GET /users/42 200 12ms",
  "level": "INFO",
  "service": "api-users",
  "http_method": "GET",
  "url": "/users/42",
  "status_code": 200,
  "duration": 12
}

status_code et duration sont maintenant des entiers, pas des strings. C'est important pour les aggregations dans Elasticsearch (calculer une moyenne de duration, par exemple).

Sortie : l'événement est affiche dans stdout et envoye a Elasticsearch dans l'index logs-app-2026.03.31.

Étape 4 : vérifier dans Kibana

Ouvre http://localhost:5601. Va dans "Stack Management" > "Index Management". Tu devrais voir un index logs-app-2026.03.31.

Va dans "Discover". Cree un data view pour le pattern logs-app-*. Tu verras les 5 événements avec tous les champs extraits : level, service, http_method, url, status_code, duration.

L'ordre des filtres compte

Les filtres s'executent dans l'ordre ou ils apparaissent dans le fichier .conf. C'est un pipeline sequentiel, pas un traitement parallèle.

filter {
  grok { ... }      # 1. Parse le message
  date { ... }      # 2. Convertit le timestamp
  mutate { ... }    # 3. Convertit les types
  geoip { ... }     # 4. Geolocalise l'IP
  mutate { ... }    # 5. Supprime les champs inutiles
}

Si tu mets le mutate remove_field avant le date, tu supprimes le champ dont date a besoin. L'ordre n'est pas arbitraire.

Regle de base : parse d'abord, enrichis ensuite, nettoie en dernier.

Plugins : ou trouver la liste

Logstash a des centaines de plugins. Chaque input, filter et output est un plugin. Les principaux sont inclus dans l'image Docker officielle.

Pour voir les plugins installes :

bashdocker exec logstash bin/logstash-plugin list

Pour installer un plugin supplementaire :

bashdocker exec logstash bin/logstash-plugin install logstash-filter-translate

Si tu as besoin de plugins supplementaires en permanence, créé un Dockerfile custom :

dockerfileFROM docker.elastic.co/logstash/logstash:8.17.0

RUN bin/logstash-plugin install logstash-filter-translate
RUN bin/logstash-plugin install logstash-input-s3

Et dans ton compose.yaml, remplace image: par build: :

yamllogstash:
  build: ./logstash
  # ...reste de la config

Les commentaires dans les fichiers .conf

La syntaxe est simple : tout ce qui commence par # est un commentaire.

# Ceci est un commentaire
input {
  file {
    path => "/data/app.log"  # commentaire en fin de ligne
  }
}

J'en abuse dans mes pipelines. Un pipeline Logstash sans commentaires, c'est un pipeline que tu ne comprendras plus dans 3 mois. Sur paltemps.fr, chaque bloc de filtre a un commentaire qui explique pourquoi il est la, pas ce qu'il fait (le code dit deja ce qu'il fait).

Les erreurs courantes dans un .conf

Oublier les guillemets

# Mauvais
path => /data/app.log

# Bon
path => "/data/app.log"

Confondre `=>` et `=` ou `:`

# Mauvais (syntaxe YAML/JSON)
path: "/data/app.log"
path = "/data/app.log"

# Bon (syntaxe Logstash)
path => "/data/app.log"

Mettre un filter sans input

Logstash refuse de démarrer si le fichier .conf n'a pas de section input et output. Le filter est optionnel, mais les deux autres sont obligatoires.

Virgules entre les paramètres

# Mauvais (pas de virgules en Logstash)
grok {
  match => { "message" => "%{WORD:name}" },
  add_tag => ["parsed"]
}

# Bon
grok {
  match => { "message" => "%{WORD:name}" }
  add_tag => ["parsed"]
}

Pas de virgules. Chaque paramètre est sur sa propre ligne, sans separateur.

Résumé

  • Un pipeline Logstash a trois blocs : input (obligatoire), filter (optionnel), output (obligatoire)
  • Un événement est un objet avec des champs. @timestamp et message sont les plus importants
  • @metadata permet de stocker des donnees temporaires invisibles dans l'output
  • La queue interne bufferise les événements entre l'input et les workers
  • Les filtres s'executent dans l'ordre : parse d'abord, enrichis ensuite, nettoie en dernier
  • La syntaxe .conf utilise => pour les paramètres, pas de virgules, # pour les commentaires

Precedent : 02 - Installation Docker | Suivant : 04 - Inputs stdin et file

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.