Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline

Configurer plusieurs pipelines dans une instance Logstash et les faire communiquer avec pipeline-to-pipeline.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

20 - Pipelines multiples et pipeline-to-pipeline

Ce que tu vas apprendre

  • Pourquoi utiliser plusieurs pipelines au lieu d'un seul
  • Configurer pipelines.yml avec des ressources par pipeline
  • Faire communiquer les pipelines entre eux (pipeline-to-pipeline)
  • Le pattern distributeur (un input, plusieurs outputs)
  • Le pattern collecteur (plusieurs inputs, un output)
  • Organiser les fichiers de config

Prerequisites


Pourquoi plusieurs pipelines

Jusqu'ici, on a mis tout dans un seul fichier .conf. Ca marche pour apprendre. En production, un pipeline unique qui gere Nginx, les logs applicatifs, le syslog et les webhooks devient un fichier de 500 lignes avec des if/else imbriques.

Les pipelines multiples resolvent ca :

  • Isolation : un pipeline qui crash ne tue pas les autres
  • Ressources : tu peux donner 4 workers au pipeline a haut debit et 1 worker au pipeline aggregate
  • Lisibilite : un fichier par type de donnees, pas un monolithe
  • Maintenance : modifier le parsing Nginx ne risque pas de casser le pipeline syslog

pipelines.yml

Le fichier pipelines.yml definit la liste des pipelines et leurs paramètres :

yaml# logstash/config/pipelines.yml

- pipeline.id: nginx
  path.config: "/usr/share/logstash/pipeline/nginx.conf"
  pipeline.workers: 4
  pipeline.batch.size: 250

- pipeline.id: app
  path.config: "/usr/share/logstash/pipeline/app.conf"
  pipeline.workers: 2
  pipeline.batch.size: 125

- pipeline.id: syslog
  path.config: "/usr/share/logstash/pipeline/syslog.conf"
  pipeline.workers: 1
  pipeline.batch.size: 125

- pipeline.id: dead-letters
  path.config: "/usr/share/logstash/pipeline/dlq.conf"
  pipeline.workers: 1
  pipeline.batch.size: 50

Chaque pipeline a son propre fichier .conf, ses propres workers et sa propre batch size.

Structure des fichiers

logstash/
├── config/
│   ├── logstash.yml
│   └── pipelines.yml
├── pipeline/
│   ├── nginx.conf
│   ├── app.conf
│   ├── syslog.conf
│   └── dlq.conf
├── patterns/
│   └── custom
└── templates/
    └── logs.json

Un fichier par pipeline. Chaque fichier contient son input, ses filtres et son output. Pas de confusion.

Paramètres par pipeline

Paramètre Description Défaut
pipeline.id Identifiant unique obligatoire
path.config Chemin vers le .conf (ou un glob) obligatoire
pipeline.workers Nombre de threads de traitement nb de CPUs
pipeline.batch.size Événements par batch 125
pipeline.batch.delay Delai max avant d'envoyer un batch incomplet (ms) 50
pipeline.ordered Forcer l'ordre des événements (auto, true, false) auto
queue.type Type de queue (memory, persisted) memory
queue.max_bytes Taille max de la persistent queue 1 Go

Tu peux donner des ressources différentes selon les besoins. Le pipeline Nginx a haut debit a 4 workers, le pipeline DLQ de reprocessing n'en a besoin que d'un.

Globs dans path.config

Tu peux découper un pipeline en plusieurs fichiers avec un glob :

yaml- pipeline.id: nginx
  path.config: "/usr/share/logstash/pipeline/nginx/*.conf"
logstash/pipeline/nginx/
├── 01-input.conf
├── 02-filter.conf
└── 03-output.conf

Logstash concatene les fichiers dans l'ordre alphabetique. C'est pour ca qu'on les prefixe avec des numeros. L'ordre compte : l'input doit etre avant le filter, qui doit etre avant l'output.

Pipeline-to-pipeline

Les pipelines peuvent communiquer entre eux via des inputs et outputs pipeline. C'est une communication interne, en mémoire, sans réseau.

Le pattern distributeur

Un pipeline recoit toutes les donnees et les distribue vers des pipelines specialises :

                          ┌──────────────────┐
                          │ Pipeline: nginx  │
                     ┌───>│ filter + output  │
                     │    └──────────────────┘
┌──────────────────┐ │
│ Pipeline: input  │ │    ┌──────────────────┐
│ beats :5044      │─┼───>│ Pipeline: app    │
│ classify + route │ │    │ filter + output  │
└──────────────────┘ │    └──────────────────┘
                     │
                     │    ┌──────────────────┐
                     └───>│ Pipeline: syslog │
                          │ filter + output  │
                          └──────────────────┘

Le pipeline d'entree :

# logstash/pipeline/input-distributor.conf

input {
  beats { port => 5044 }
}

output {
  if "nginx" in [tags] {
    pipeline { send_to => ["nginx"] }
  } else if "app" in [tags] {
    pipeline { send_to => ["app"] }
  } else {
    pipeline { send_to => ["syslog"] }
  }
}

Les pipelines recepteurs :

# logstash/pipeline/nginx.conf

input {
  pipeline { address => "nginx" }
}

filter {
  grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
  # ... autres filtres nginx
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "nginx-%{+YYYY.MM.dd}"
  }
}
# logstash/pipeline/app.conf

input {
  pipeline { address => "app" }
}

filter {
  json { source => "message" }
  # ... autres filtres app
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "app-%{+YYYY.MM.dd}"
  }
}

Le pipelines.yml :

yaml- pipeline.id: input-distributor
  path.config: "/usr/share/logstash/pipeline/input-distributor.conf"
  pipeline.workers: 2

- pipeline.id: nginx
  path.config: "/usr/share/logstash/pipeline/nginx.conf"
  pipeline.workers: 4

- pipeline.id: app
  path.config: "/usr/share/logstash/pipeline/app.conf"
  pipeline.workers: 2

- pipeline.id: syslog
  path.config: "/usr/share/logstash/pipeline/syslog.conf"
  pipeline.workers: 1

L'avantage : chaque pipeline a ses propres workers. Le pipeline Nginx peut avoir 4 workers sans affecter le pipeline syslog qui n'en a besoin que d'un.

Le pattern collecteur

L'inverse : plusieurs pipelines envoient vers un seul pipeline de sortie.

┌──────────────────┐
│ Pipeline: beats  │
│ input beats      │───┐
│ filter           │   │
└──────────────────┘   │    ┌──────────────────┐
                       ├───>│ Pipeline: output │
┌──────────────────┐   │    │ elasticsearch    │
│ Pipeline: http   │   │    │ s3 archive       │
│ input http       │───┘    └──────────────────┘
│ filter           │   │
└──────────────────┘   │
                       │
┌──────────────────┐   │
│ Pipeline: kafka  │   │
│ input kafka      │───┘
│ filter           │
└──────────────────┘

Les pipelines sources :

# logstash/pipeline/input-beats.conf

input {
  beats { port => 5044 }
}

filter {
  # parsing specifique beats
}

output {
  pipeline { send_to => ["output-main"] }
}
# logstash/pipeline/input-http.conf

input {
  http { port => 8080; codec => json }
}

filter {
  # parsing specifique http
}

output {
  pipeline { send_to => ["output-main"] }
}

Le pipeline de sortie centralise :

# logstash/pipeline/output-main.conf

input {
  pipeline { address => "output-main" }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[@metadata][source]}-%{+YYYY.MM.dd}"
  }

  s3 {
    bucket => "logs-archive"
    prefix => "logstash/%{+YYYY/MM/dd}"
    codec => json_lines
    encoding => "gzip"
  }
}

L'avantage : la configuration de l'output Elasticsearch (hosts, templates, auth) est a un seul endroit. Si tu changes le cluster ES, tu modifies un seul fichier.

Le pattern distributeur + collecteur combine

L'architecture la plus propre combine les deux :

                     ┌─────────────┐
                ┌───>│ filter-nginx│───┐
┌────────────┐  │    └─────────────┘   │    ┌──────────────┐
│   input    │──┤                      ├───>│    output    │
│ distributor│  │    ┌─────────────┐   │    │  collecteur  │
└────────────┘  ├───>│ filter-app  │───┤    └──────────────┘
                │    └─────────────┘   │
                │    ┌─────────────┐   │
                └───>│ filter-sys  │───┘
                     └─────────────┘

L'input est centralise, les filtres sont isoles par type, l'output est centralise. Chaque composant est remplacable indépendamment.

Monitoring des pipelines multiples

L'API de monitoring (localhost:9600) montre les stats par pipeline :

bashcurl -s http://localhost:9600/_node/stats/pipelines | python3 -m json.tool

Tu verras les metriques de chaque pipeline : events in/out, duree de filtre, duree d'output. C'est indispensable pour identifier quel pipeline est le bottleneck.

Quand utiliser des pipelines multiples

Situation Un seul pipeline Pipelines multiples
Dev / apprentissage Oui Non
Une seule source de donnees Oui Non
2-3 sources, parsing simple Oui (avec if/else) Optionnel
4+ sources, parsing complexe Non Oui
Besoin de workers différents Non Oui
Pipeline aggregate (workers: 1) Bloque tout Isole
Production Possible Recommande

Sur paltemps.fr, on utilise 3 pipelines : un input distributeur, un filtre par type de donnees (app, nginx, system), et un output collecteur. C'est plus de fichiers, mais chaque fichier fait moins de 50 lignes et ne gere qu'une seule responsabilité.

Résumé

  • pipelines.yml definit plusieurs pipelines avec des ressources independantes
  • Chaque pipeline a son propre fichier .conf, ses workers et sa batch size
  • Pipeline-to-pipeline permet la communication interne via pipeline { send_to } et pipeline { address }
  • Le pattern distributeur : un input repartit vers plusieurs filtres
  • Le pattern collecteur : plusieurs filtres convergent vers un output
  • Combine les deux pour une architecture propre et maintenable
  • L'API _node/stats/pipelines monitore chaque pipeline séparément

Precedent : 19 - Multiline | Suivant : 21 - Performance et tuning

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.