03 - Anatomie d'un pipeline Logstash
Ce que tu vas apprendre
- La structure d'un fichier
.conf(input, filter, output) - Le modèle d'événement Logstash (champs, @timestamp, @metadata)
- Le rôle de la queue interne entre les blocs
- Comment un événement traverse le pipeline de bout en bout
- Écrire un premier pipeline réel qui parse un log
Prerequisites
- Avoir le lab Docker qui tourne (voir article 02)
Le pipeline, c'est trois blocs
Quand j'explique Logstash a un junior, je commence toujours par la meme analogie : c'est une usine avec trois postes.
Le premier poste recoit la matière première. Le deuxieme la transforme. Le troisieme l'emballe et l'envoie. C'est input, filter, output.
Donnees brutes Donnees propres
│ │
▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────┐
│ │ │ │ │ │
│ INPUT │───>│ FILTER │───>│ OUTPUT │
│ │ │ │ │ │
│ lit les │ │ parse │ │ envoie │
│ donnees │ │ enrichit │ │ les │
│ │ │ nettoie │ │ donnees │
└─────────┘ └──────────┘ └──────────┘
│ │ │
│ (optionnel) │
│ │
└──────────────────────────────┘
si pas de filtre,
l'input va direct a l'output
Le bloc filter est optionnel. Tu peux envoyer des donnees d'un input a un output sans rien transformer. C'est ce qu'on a fait dans l'article 02 avec stdin -> stdout.
Le fichier .conf
Un pipeline Logstash se definit dans un fichier .conf. La syntaxe n'est ni du JSON, ni du YAML, ni du TOML. C'est un format propre a Logstash.
# logstash/pipeline/main.conf
input {
# un ou plusieurs plugins d'entree
}
filter {
# zero ou plusieurs plugins de transformation
}
output {
# un ou plusieurs plugins de sortie
}
Chaque bloc contient un ou plusieurs plugins. Un plugin a un nom et des paramètres entre accolades :
input {
file {
path => "/data/app.log"
start_position => "beginning"
}
}
La syntaxe des paramètres : cle => valeur. Les chaînes sont entre guillemets doubles. Les nombres et booleens sont nus. Les listes sont entre crochets.
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
tag_on_failure => ["_grokparsefailure", "_parseerror"]
add_field => { "parsed" => "true" }
}
}
Le modèle d'événement
Tout ce qui traverse un pipeline Logstash est un événement. Un événement, c'est un objet avec des champs. Quand tu envoies la ligne de texte 2026-03-31 ERROR Connection refused a Logstash, il créé un événement qui ressemble a ca :
json{
"@timestamp": "2026-03-31T10:15:42.123Z",
"@version": "1",
"message": "2026-03-31 ERROR Connection refused",
"host": {
"hostname": "logstash"
},
"event": {
"original": "2026-03-31 ERROR Connection refused"
}
}
Les champs qui commencent par @ sont des champs système :
@timestamp: le moment ou l'événement a ete créé (ou parse). C'est le champ que Kibana utilise pour la timeline.@version: toujours "1". Un vestige historique.
Le champ message contient les donnees brutes. C'est sur ce champ que les filtres comme Grok vont travailler pour en extraire des informations structurees.
Champs imbriques
Les champs peuvent etre imbriques. On y accede avec la notation entre crochets :
# Acceder au champ hostname dans l'objet host
[host][hostname]
# Acceder a un champ de premier niveau
[message]
# Creer un champ imbrique dans un filtre
add_field => { "[geo][country]" => "France" }
Cette notation est partout dans Logstash. Dans les filtres, les conditionnels, les outputs. Il faut s'y habituer.
Le champ @metadata
Il existe un champ special : @metadata. C'est un objet invisible. Les champs stockes dans @metadata sont disponibles dans tout le pipeline, mais ils ne sont jamais envoyes a l'output.
filter {
mutate {
add_field => { "[@metadata][target_index]" => "logs-app" }
}
}
output {
elasticsearch {
index => "%{[@metadata][target_index]}"
}
}
L'index sera logs-app, mais le champ [@metadata][target_index] n'apparaitra pas dans le document Elasticsearch. C'est pratique pour stocker des informations de routage sans polluer les donnees.
La queue interne
Entre l'input et les workers de filtre/output, il y a une queue. Par défaut, c'est une queue en mémoire.
┌─────────────────────────────────┐
│ Pipeline workers │
│ │
┌─────────┐ ┌────────┐ │ ┌────────┐ ┌────────┐ │
│ │ │ │ │ │ │ │ │ │
│ INPUT │───>│ QUEUE │─┼─>│ FILTER │───>│ OUTPUT │ │
│ │ │ │ │ │ │ │ │ │
└─────────┘ │ │ │ └────────┘ └────────┘ │
│ │ │ │
│ │ │ ┌────────┐ ┌────────┐ │
│ │─┼─>│ FILTER │───>│ OUTPUT │ │
│ │ │ │ │ │ │ │
└────────┘ │ └────────┘ └────────┘ │
│ (worker 1) (worker 2) │
└─────────────────────────────────┘
L'input pousse les événements dans la queue. Les workers (nombre défini par pipeline.workers) prennent des lots d'événements (taille définie par pipeline.batch.size) et les font passer par les filtres puis l'output.
Deux types de queue :
| Type | Avantages | Inconvenients |
|---|---|---|
memory (défaut) |
Rapide, zero config | Perd les donnees si Logstash crash |
persisted |
Survit aux crashes | Plus lent, nécessité du disque |
On verra la persistent queue dans l'article 21 sur la performance. Pour le dev, la queue en mémoire suffit.
Un pipeline réel : parser un log applicatif
Assez de theorie. Voici un cas concret. Tu as une application qui écrit des logs comme ca :
2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms
2026-03-31 14:23:01 ERROR [api-users] POST /users 500 5023ms Connection refused
2026-03-31 14:23:02 WARN [api-orders] GET /orders?page=2 200 340ms
On veut extraire : la date, le niveau de log, le service, la méthode HTTP, l'URL, le code de réponse et la duree.
Étape 1 : créer le fichier de test
bash# data/sample.log
2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms
2026-03-31 14:23:01 ERROR [api-users] POST /users 500 5023ms Connection refused
2026-03-31 14:23:02 WARN [api-orders] GET /orders?page=2 200 340ms
2026-03-31 14:23:03 INFO [api-users] DELETE /users/99 204 8ms
2026-03-31 14:23:03 DEBUG [api-auth] POST /auth/login 200 156ms
Étape 2 : écrire le pipeline
# logstash/pipeline/main.conf
input {
file {
path => "/data/sample.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
# Parser la ligne de log
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:level}\s+\[%{DATA:service}\] %{WORD:http_method} %{URIPATH:url} %{INT:status_code} %{INT:duration}ms(?:\s%{GREEDYDATA:error_message})?"
}
}
# Convertir le timestamp du log en @timestamp
date {
match => ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
remove_field => ["log_timestamp"]
}
# Convertir les types
mutate {
convert => {
"status_code" => "integer"
"duration" => "integer"
}
}
# Supprimer les champs inutiles
mutate {
remove_field => ["event", "host", "@version"]
}
}
output {
# Afficher dans la console pour voir le resultat
stdout {
codec => rubydebug
}
# Envoyer dans Elasticsearch
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-app-%{+YYYY.MM.dd}"
}
}
Sauvegarde le fichier. Logstash le recharge en 3 secondes.
Étape 3 : comprendre ce qui se passe
Prenons la première ligne du log et suivons-la à travers le pipeline :
Entree (input file) : Logstash lit la ligne brute.
json{
"@timestamp": "2026-03-31T12:30:00.000Z",
"message": "2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms"
}
Apres Grok : le filtre Grok extrait les champs avec la regex.
json{
"@timestamp": "2026-03-31T12:30:00.000Z",
"message": "2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms",
"log_timestamp": "2026-03-31 14:23:01",
"level": "INFO",
"service": "api-users",
"http_method": "GET",
"url": "/users/42",
"status_code": "200",
"duration": "12"
}
Apres Date : le filtre date remplace @timestamp par la vraie date du log.
json{
"@timestamp": "2026-03-31T14:23:01.000Z",
"message": "2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms",
"level": "INFO",
"service": "api-users",
"http_method": "GET",
"url": "/users/42",
"status_code": "200",
"duration": "12"
}
log_timestamp a ete supprime par remove_field.
Apres Mutate : les champs numériques sont convertis et les champs inutiles supprimes.
json{
"@timestamp": "2026-03-31T14:23:01.000Z",
"message": "2026-03-31 14:23:01 INFO [api-users] GET /users/42 200 12ms",
"level": "INFO",
"service": "api-users",
"http_method": "GET",
"url": "/users/42",
"status_code": 200,
"duration": 12
}
status_code et duration sont maintenant des entiers, pas des strings. C'est important pour les aggregations dans Elasticsearch (calculer une moyenne de duration, par exemple).
Sortie : l'événement est affiche dans stdout et envoye a Elasticsearch dans l'index logs-app-2026.03.31.
Étape 4 : vérifier dans Kibana
Ouvre http://localhost:5601. Va dans "Stack Management" > "Index Management". Tu devrais voir un index logs-app-2026.03.31.
Va dans "Discover". Cree un data view pour le pattern logs-app-*. Tu verras les 5 événements avec tous les champs extraits : level, service, http_method, url, status_code, duration.
L'ordre des filtres compte
Les filtres s'executent dans l'ordre ou ils apparaissent dans le fichier .conf. C'est un pipeline sequentiel, pas un traitement parallèle.
filter {
grok { ... } # 1. Parse le message
date { ... } # 2. Convertit le timestamp
mutate { ... } # 3. Convertit les types
geoip { ... } # 4. Geolocalise l'IP
mutate { ... } # 5. Supprime les champs inutiles
}
Si tu mets le mutate remove_field avant le date, tu supprimes le champ dont date a besoin. L'ordre n'est pas arbitraire.
Regle de base : parse d'abord, enrichis ensuite, nettoie en dernier.
Plugins : ou trouver la liste
Logstash a des centaines de plugins. Chaque input, filter et output est un plugin. Les principaux sont inclus dans l'image Docker officielle.
Pour voir les plugins installes :
bashdocker exec logstash bin/logstash-plugin list
Pour installer un plugin supplementaire :
bashdocker exec logstash bin/logstash-plugin install logstash-filter-translate
Si tu as besoin de plugins supplementaires en permanence, créé un Dockerfile custom :
dockerfileFROM docker.elastic.co/logstash/logstash:8.17.0
RUN bin/logstash-plugin install logstash-filter-translate
RUN bin/logstash-plugin install logstash-input-s3
Et dans ton compose.yaml, remplace image: par build: :
yamllogstash:
build: ./logstash
# ...reste de la config
Les commentaires dans les fichiers .conf
La syntaxe est simple : tout ce qui commence par # est un commentaire.
# Ceci est un commentaire
input {
file {
path => "/data/app.log" # commentaire en fin de ligne
}
}
J'en abuse dans mes pipelines. Un pipeline Logstash sans commentaires, c'est un pipeline que tu ne comprendras plus dans 3 mois. Sur paltemps.fr, chaque bloc de filtre a un commentaire qui explique pourquoi il est la, pas ce qu'il fait (le code dit deja ce qu'il fait).
Les erreurs courantes dans un .conf
Oublier les guillemets
# Mauvais
path => /data/app.log
# Bon
path => "/data/app.log"
Confondre `=>` et `=` ou `:`
# Mauvais (syntaxe YAML/JSON)
path: "/data/app.log"
path = "/data/app.log"
# Bon (syntaxe Logstash)
path => "/data/app.log"
Mettre un filter sans input
Logstash refuse de démarrer si le fichier .conf n'a pas de section input et output. Le filter est optionnel, mais les deux autres sont obligatoires.
Virgules entre les paramètres
# Mauvais (pas de virgules en Logstash)
grok {
match => { "message" => "%{WORD:name}" },
add_tag => ["parsed"]
}
# Bon
grok {
match => { "message" => "%{WORD:name}" }
add_tag => ["parsed"]
}
Pas de virgules. Chaque paramètre est sur sa propre ligne, sans separateur.
Résumé
- Un pipeline Logstash a trois blocs : input (obligatoire), filter (optionnel), output (obligatoire)
- Un événement est un objet avec des champs.
@timestampetmessagesont les plus importants @metadatapermet de stocker des donnees temporaires invisibles dans l'output- La queue interne bufferise les événements entre l'input et les workers
- Les filtres s'executent dans l'ordre : parse d'abord, enrichis ensuite, nettoie en dernier
- La syntaxe
.confutilise=>pour les paramètres, pas de virgules,#pour les commentaires
Precedent : 02 - Installation Docker | Suivant : 04 - Inputs stdin et file