18 - Outputs file, stdout et les autres
Ce que tu vas apprendre
- Écrire des événements dans des fichiers (JSON, CSV, format custom)
- Utiliser stdout pour le debug en dev
- Envoyer vers Kafka, HTTP, S3 et d'autres destinations
- Configurer du fan-out (meme événement vers plusieurs destinations)
- Combiner conditionnels et outputs pour du routage avance
Prerequisites
- Avoir compris l'output Elasticsearch (voir article 17)
- Avoir compris les conditionnels (voir article 16)
Elasticsearch n'est pas la seule destination
La majorite des pipelines envoient vers Elasticsearch. Mais Logstash sait écrire dans une vingtaine de destinations. Archiver les logs sur S3, repousser des événements dans Kafka, appeler un webhook, écrire un fichier CSV pour un data analyst qui veut Excel. Tout ca se fait dans le bloc output.
stdout : le debug du quotidien
On l'a utilise depuis le début de la serie. stdout affiche les événements dans la console de Logstash.
rubydebug (dev)
output {
stdout {
codec => rubydebug
}
}
Affichage lisible avec indentation et types. Lent, verbeux, parfait pour le dev. Ne jamais utiliser en production.
Pour voir les metadonnees (@metadata) :
output {
stdout {
codec => rubydebug { metadata => true }
}
}
json (production-friendly)
output {
stdout {
codec => json
}
}
Un objet JSON par ligne. Plus rapide que rubydebug, et parsable par d'autres outils. Utile si Logstash est un container Docker et que tu lis ses logs avec docker compose logs.
dots (mesure du debit)
output {
stdout {
codec => dots
}
}
Un point par événement. Rien d'autre. Tu vois le debit en un coup d'oeil. Si les points defilent vite, le pipeline tourne.
Supprimer stdout en production
En production, stdout n'a pas de raison d'etre. Chaque événement écrit dans stdout est un appel I/O inutile. Utilise un conditionnel ou supprime-le :
output {
if [@metadata][debug] == "true" {
stdout { codec => rubydebug }
}
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
file : écrire dans des fichiers
JSON lines (archivage)
output {
file {
path => "/data/output/logs-%{+YYYY-MM-dd}.json"
codec => json_lines
}
}
Un objet JSON par ligne. Un fichier par jour grâce à %{+YYYY-MM-dd} dans le path. C'est le format le plus courant pour l'archivage : facilement reimportable dans Logstash ou n'importe quel outil.
CSV
output {
file {
path => "/data/output/logs-%{+YYYY-MM-dd}.csv"
codec => csv {
columns => ["@timestamp", "level", "service", "status_code", "duration", "url"]
include_headers => true
separator => ","
}
}
}
Un CSV propre avec les colonnes dans l'ordre que tu specifies. include_headers => true ajoute la ligne d'en-tête. Parfait pour les non-devs qui veulent ouvrir ca dans Excel.
Format custom
output {
file {
path => "/data/output/access.log"
codec => line {
format => "%{@timestamp} %{level} [%{service}] %{http_method} %{url} %{status_code} %{duration}ms"
}
}
}
Tu reconstitues un format de log custom. Utile pour alimenter un outil qui attend un format spécifique.
Rotation des fichiers
Le plugin file ne fait pas de rotation lui-meme. Utilise %{+YYYY-MM-dd} dans le path pour générer un fichier par jour, et gere la compression/suppression a l'extérieur (cron, logrotate).
output {
file {
path => "/data/output/logs-%{+YYYY-MM-dd-HH}.json"
codec => json_lines
}
}
%{+YYYY-MM-dd-HH} généré un fichier par heure : logs-2026-03-31-14.json. Pour du haut volume, c'est plus geree qu'un gros fichier par jour.
Compression gzip
output {
file {
path => "/data/output/logs-%{+YYYY-MM-dd}.json.gz"
codec => json_lines
gzip => true
}
}
Les fichiers sont compresses a la volee. Economise 70-80% d'espace disque.
kafka : repousser dans un topic
Logstash peut écrire dans Kafka aussi bien qu'il peut en lire. C'est le pattern "fan-out" : les logs arrivent, tu les envoies a Elasticsearch ET a Kafka. Un autre service consomme le topic Kafka pour de l'alerting, de l'analytics, ou de l'archivage.
output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "logs-processed"
codec => json
acks => "all"
retries => 3
compression_type => "snappy"
}
}
| Paramètre | Description |
|---|---|
topic_id |
Le topic de destination |
codec |
Format des messages (json, json_lines, plain) |
acks |
Niveau d'acquittement (0, 1, all) |
retries |
Nombre de retries en cas d'erreur |
compression_type |
Compression : none, gzip, snappy, lz4 |
Topic dynamique
Tu peux utiliser des champs de l'événement dans le topic :
output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "logs-%{service}"
codec => json
}
}
Les logs du service api-users vont dans logs-api-users, ceux de api-orders dans logs-api-orders. Un topic par service.
http : appeler un webhook
Envoyer des événements a n'importe quelle API HTTP :
output {
http {
url => "https://hooks.slack.com/services/T000/B000/xxxx"
http_method => "post"
format => "json"
mapping => {
"text" => "[ALERT] %{level} on %{service}: %{message}"
}
}
}
Cet exemple envoie les événements vers un webhook Slack. Chaque événement devient un message dans un channel.
Pour des alertes conditionnelles
output {
if [level] == "ERROR" and [service] == "api-payments" {
http {
url => "https://hooks.slack.com/services/T000/B000/xxxx"
http_method => "post"
format => "json"
mapping => {
"text" => ":red_circle: *%{service}* — %{message}\nStatus: %{status_code} | Duration: %{duration}ms"
}
}
}
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
Seules les erreurs du service de paiement declenchent une notification Slack. Tous les événements vont dans Elasticsearch.
Vers une API générique
output {
http {
url => "https://api.example.com/ingest"
http_method => "post"
format => "json"
headers => {
"Authorization" => "Bearer %{[@metadata][api_token]}"
"Content-Type" => "application/json"
}
retry_failed => true
retryable_codes => [429, 500, 502, 503]
}
}
Le token d'authentification est dans @metadata pour ne pas apparaître dans les documents.
s3 : archiver sur AWS S3
output {
s3 {
region => "eu-west-1"
bucket => "logs-archive"
prefix => "logstash/%{+YYYY/MM/dd}"
codec => json_lines
rotation_strategy => "size_and_time"
size_file => 52428800
time_file => 15
encoding => "gzip"
}
}
| Paramètre | Description |
|---|---|
prefix |
Chemin dans le bucket (avec date = hiérarchie par jour) |
size_file |
Taille max d'un fichier avant rotation (50 Mo ici) |
time_file |
Duree max en minutes avant rotation |
encoding |
Compression (gzip, none) |
Les fichiers dans S3 :
s3://logs-archive/
└── logstash/
└── 2026/
└── 03/
└── 31/
├── ls.s3.abc123-14.json.gz
├── ls.s3.abc123-15.json.gz
└── ls.s3.abc123-16.json.gz
L'authentification AWS se fait via les variables d'environnement standard (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) ou via un rôle IAM si Logstash tourne sur EC2/ECS.
Fan-out : plusieurs destinations
Un pipeline peut avoir plusieurs outputs. Chaque événement est envoye a toutes les destinations (sauf si tu utilises des conditionnels).
output {
# Destination principale : Elasticsearch
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# Archivage : fichier JSON compresse
file {
path => "/data/archive/logs-%{+YYYY-MM-dd}.json.gz"
codec => json_lines
gzip => true
}
# Alerting : Kafka pour les erreurs
if [level] == "ERROR" {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "alerts"
codec => json
}
}
# Debug : stdout en dev uniquement
if [@metadata][env] == "development" {
stdout { codec => rubydebug }
}
}
Chaque output est independant. Si Kafka est lent, ca n'affecte pas Elasticsearch. Si le fichier est plein, les autres outputs continuent.
Le pattern archive + search
Un pattern courant en production : indexer dans Elasticsearch pour la recherche en temps réel (retention 30 jours), et archiver dans S3 pour le long terme (retention 1 an).
┌──────────┐ ┌──────────────┐
│ │────>│ Elasticsearch│ Retention : 30 jours
│ Logstash │ │ (recherche) │ Acces : Kibana
│ │ └──────────────┘
│ │
│ │ ┌──────────────┐
│ │────>│ S3 │ Retention : 1 an
│ │ │ (archive) │ Acces : reimport si besoin
└──────────┘ └──────────────┘
Sur paltemps.fr, c'est exactement ce qu'on fait. Elasticsearch garde 30 jours de logs pour le debug quotidien. S3 garde un an pour l'audit et la compliance. Si on a besoin de logs vieux de 3 mois, on les reimporte dans un index temporaire depuis S3.
Tableau recapitulatif des outputs
| Output | Use case | Format | Volume |
|---|---|---|---|
elasticsearch |
Recherche, dashboards | JSON | Illimite |
file |
Archivage local, export | JSON, CSV, custom | Limite par disque |
kafka |
Buffer, fan-out, stream processing | JSON | Illimite |
http |
Webhooks, alerting, APIs tierces | JSON, form | Faible a moyen |
s3 |
Archivage cloud, data lake | JSON, gzip | Illimite |
stdout |
Debug | rubydebug, json, dots | Dev uniquement |
Résumé
- stdout avec rubydebug pour le debug, json pour la prod, dots pour mesurer le debit
- Le file output écrit en JSON lines, CSV ou format custom, avec compression gzip
- Le kafka output permet le fan-out et le découplage avec d'autres consumers
- Le http output appelle des webhooks (Slack, APIs) avec des conditionnels pour filtrer
- Le s3 output archive les logs avec rotation par taille/temps et compression
- Un pipeline peut avoir plusieurs outputs independants (fan-out)
- Le pattern standard en prod : Elasticsearch (30 jours) + S3 (1 an)
Precedent : 17 - Output Elasticsearch | Suivant : 19 - Gerer le multiline