Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat

Configurer l'input Beats pour recevoir des événements de Filebeat. Architecture Filebeat + Logstash avec Docker Compose.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

05 - Input Beats : recevoir des logs de Filebeat

Ce que tu vas apprendre

  • Pourquoi Filebeat + Logstash est l'architecture standard
  • Configurer l'input Beats dans Logstash
  • Installer et configurer Filebeat avec Docker Compose
  • Faire circuler des logs de bout en bout : application -> fichier -> Filebeat -> Logstash -> Elasticsearch
  • Gerer les tags et les champs envoyes par Filebeat

Prerequisites


Pourquoi pas juste le file input

Dans l'article précédent, on a lu des fichiers de log directement avec le plugin file de Logstash. Ca marche pour un serveur. Mais en production, tu as 5, 10, 50 serveurs. Tu ne vas pas installer Logstash (et sa JVM a 1 Go de RAM) sur chacun.

C'est la que Filebeat entre en jeu. Filebeat est un agent leger (15 Mo de RAM) qui tourne sur chaque serveur, lit les fichiers de log, et les envoie a Logstash. Logstash est centralise, sur un seul serveur (ou deux pour la HA), et fait tout le parsing.

┌──────────┐     ┌──────────┐
│ Serveur 1│     │          │
│ app.log  │     │          │
│ Filebeat │────>│          │     ┌──────────────┐     ┌──────────┐
└──────────┘     │          │     │              │     │          │
                 │ Logstash │────>│ Elasticsearch│────>│  Kibana  │
┌──────────┐     │          │     │              │     │          │
│ Serveur 2│     │  :5044   │     │  :9200       │     │  :5601   │
│ app.log  │     │          │     │              │     │          │
│ Filebeat │────>│          │     └──────────────┘     └──────────┘
└──────────┘     │          │
                 └──────────┘
┌──────────┐          ^
│ Serveur 3│          │
│ app.log  │          │
│ Filebeat │──────────┘
└──────────┘

L'autre avantage : la backpressure. Si Logstash est surcharge ou tombe, Filebeat garde les logs en mémoire et sur disque. Quand Logstash revient, Filebeat renvoie ce qu'il a mis en file d'attente. Pas de perte.

Le lab complet : 5 services

On va monter un lab avec 5 services Docker :

  1. app : une application qui généré des logs
  2. filebeat : collecte les logs de l'app
  3. logstash : parse et enrichit les logs
  4. elasticsearch : stocke les logs
  5. kibana : visualise les logs

Structure du projet

beats-lab/
├── compose.yaml
├── app/
│   └── generate-logs.sh
├── filebeat/
│   └── filebeat.yml
├── logstash/
│   ├── config/
│   │   ├── logstash.yml
│   │   └── pipelines.yml
│   └── pipeline/
│       └── beats.conf
└── data/
    └── logs/

Cree la structure :

bashmkdir -p beats-lab/{app,filebeat,logstash/config,logstash/pipeline,data/logs}
cd beats-lab

Le generateur de logs

Un script shell qui simule une application ecrivant des logs :

bash#!/bin/bash
# app/generate-logs.sh

SERVICES=("api-users" "api-orders" "api-payments" "worker-email")
LEVELS=("INFO" "WARN" "ERROR" "DEBUG")
METHODS=("GET" "POST" "PUT" "DELETE")
URLS=("/users" "/users/42" "/orders" "/orders/123/pay" "/health")
CODES=("200" "200" "201" "400" "404" "500")
MESSAGES=("OK" "Created" "Bad request" "Not found" "Connection timeout" "Slow query: 2340ms")

LOG_FILE="/logs/app.log"

echo "Starting log generation..."

while true; do
  SERVICE=${SERVICES[$RANDOM % ${#SERVICES[@]}]}
  LEVEL=${LEVELS[$RANDOM % ${#LEVELS[@]}]}
  METHOD=${METHODS[$RANDOM % ${#METHODS[@]}]}
  URL=${URLS[$RANDOM % ${#URLS[@]}]}
  CODE=${CODES[$RANDOM % ${#CODES[@]}]}
  MSG=${MESSAGES[$RANDOM % ${#MESSAGES[@]}]}
  DURATION=$((RANDOM % 5000))
  TIMESTAMP=$(date -u +"%Y-%m-%d %H:%M:%S")

  echo "${TIMESTAMP} ${LEVEL} [${SERVICE}] ${METHOD} ${URL} ${CODE} ${DURATION}ms ${MSG}" >> "$LOG_FILE"

  sleep $((RANDOM % 3 + 1))
done

La configuration Filebeat

yaml# filebeat/filebeat.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /logs/app.log
    fields:
      environment: development
      app: myapp
    fields_under_root: true

output.logstash:
  hosts: ["logstash:5044"]

# Desactiver les modules et le monitoring pour simplifier
setup.template.enabled: false
setup.ilm.enabled: false

logging.level: info
logging.to_stderr: true

Quelques points :

  • paths : les fichiers a surveiller (dans le conteneur)
  • fields : des champs custom ajoutes a chaque événement. fields_under_root: true les met a la racine de l'événement (pas dans un objet fields)
  • output.logstash : envoie a Logstash sur le port 5044, pas directement a Elasticsearch

Le pipeline Logstash

# logstash/pipeline/beats.conf

input {
  beats {
    port => 5044
  }
}

filter {
  # Parser les logs de l'application
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:level}\s+\[%{DATA:service}\] %{WORD:http_method} %{URIPATH:url} %{INT:status_code} %{INT:duration}ms %{GREEDYDATA:response_message}"
    }
  }

  # Utiliser le timestamp du log, pas celui de la reception
  date {
    match => ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]
    target => "@timestamp"
    remove_field => ["log_timestamp"]
  }

  # Convertir les types numeriques
  mutate {
    convert => {
      "status_code" => "integer"
      "duration" => "integer"
    }
  }

  # Ajouter un flag si la requete est lente
  if [duration] > 1000 {
    mutate {
      add_tag => ["slow_request"]
      add_field => { "alert_level" => "warning" }
    }
  }

  # Ajouter un flag si erreur serveur
  if [status_code] >= 500 {
    mutate {
      add_tag => ["server_error"]
      add_field => { "alert_level" => "critical" }
    }
  }

  # Supprimer les champs Filebeat qu'on ne veut pas garder
  mutate {
    remove_field => ["agent", "ecs", "input", "log"]
  }
}

output {
  # Debug : voir les evenements dans la console
  stdout {
    codec => rubydebug
  }

  # Envoyer dans Elasticsearch
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[app]}-%{+YYYY.MM.dd}"
  }
}

La configuration Logstash

yaml# logstash/config/logstash.yml

api.http.host: "0.0.0.0"
api.http.port: 9600
config.reload.automatic: true
config.reload.interval: 3s
monitoring.enabled: false
yaml# logstash/config/pipelines.yml

- pipeline.id: beats
  path.config: "/usr/share/logstash/pipeline/beats.conf"
  pipeline.workers: 2
  pipeline.batch.size: 125

Le compose.yaml complet

yamlservices:
  # Application qui genere des logs
  app:
    image: bash:5
    container_name: app
    volumes:
      - ./app/generate-logs.sh:/generate-logs.sh:ro
      - logs-volume:/logs
    command: ["bash", "/generate-logs.sh"]

  # Filebeat collecte les logs de l'app
  filebeat:
    image: docker.elastic.co/beats/filebeat:8.17.0
    container_name: filebeat
    user: root
    volumes:
      - ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
      - logs-volume:/logs:ro
    depends_on:
      - logstash
    command: ["filebeat", "-e", "-strict.perms=false"]

  # Logstash parse et enrichit
  logstash:
    image: docker.elastic.co/logstash/logstash:8.17.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms256m -Xmx256m"
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
    ports:
      - "5044:5044"
      - "9600:9600"
    depends_on:
      elasticsearch:
        condition: service_healthy

  # Elasticsearch stocke
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10

  # Kibana visualise
  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  es-data:
  logs-volume:

Le volume logs-volume est partage entre l'app (qui écrit) et Filebeat (qui lit). C'est comme ca que Filebeat accede aux logs de l'application sans etre sur le meme conteneur.

Lancer le lab

bashdocker compose up -d

Verifie que les 5 services tournent :

bashdocker compose ps

Regarde les logs de Logstash pour voir les événements arriver :

bashdocker compose logs logstash -f --tail 20

Tu devrais voir les événements parses avec les champs extraits : level, service, http_method, url, status_code, duration. Les requêtes lentes ont le tag slow_request, les erreurs 500 ont server_error.

Ce que Filebeat envoie a Logstash

Quand Filebeat lit une ligne de log, il ne se contente pas d'envoyer le texte. Il ajoute des metadonnees. Voici a quoi ressemble un événement Beats brut avant les filtres Logstash :

json{
  "@timestamp": "2026-03-31T14:30:00.000Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.17.0"
  },
  "message": "2026-03-31 14:30:00 ERROR [api-users] POST /users 500 5023ms Connection timeout",
  "host": {
    "name": "filebeat"
  },
  "agent": {
    "type": "filebeat",
    "version": "8.17.0",
    "name": "filebeat",
    "id": "abc-123..."
  },
  "log": {
    "file": {
      "path": "/logs/app.log"
    },
    "offset": 1234
  },
  "input": {
    "type": "log"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "environment": "development",
  "app": "myapp"
}

Beaucoup de champs. La plupart sont des metadonnees Filebeat qu'on ne veut pas dans Elasticsearch. C'est pour ca qu'on a ce remove_field dans le pipeline :

mutate {
  remove_field => ["agent", "ecs", "input", "log"]
}

Les champs environment et app sont ceux qu'on a ajoutes dans filebeat.yml avec fields et fields_under_root: true.

Le champ @metadata des Beats

Filebeat envoie aussi des metadonnees dans @metadata. Tu ne les vois pas dans stdout (rubydebug ne les affiche pas par défaut), mais tu peux les utiliser dans le pipeline :

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    # Utiliser la version du beat pour le template
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
  }
}

%{[@metadata][beat]} vaut filebeat. L'index sera filebeat-2026.03.31.

Pour voir les metadonnees dans stdout, utilise le codec rubydebug avec l'option metadata :

output {
  stdout {
    codec => rubydebug { metadata => true }
  }
}

Plusieurs Filebeat, un Logstash

L'architecture standard en production. Chaque serveur a son Filebeat, tout pointe vers le meme Logstash.

Filebeat gere la reconnexion et le retry. Si Logstash est injoignable, Filebeat bufferise en mémoire (par défaut) et retente toutes les secondes. Quand Logstash revient, les logs en attente sont envoyes.

Tu peux identifier quel serveur a envoye quel log en ajoutant un champ dans filebeat.yml :

yaml# Sur le serveur 1
filebeat.inputs:
  - type: log
    paths:
      - /var/log/app/*.log
    fields:
      server: prod-api-01
    fields_under_root: true
yaml# Sur le serveur 2
filebeat.inputs:
  - type: log
    paths:
      - /var/log/app/*.log
    fields:
      server: prod-api-02
    fields_under_root: true

Dans Kibana, tu pourras filtrer par server: prod-api-01 pour ne voir que les logs d'un serveur precis.

Tags et routage

Filebeat peut ajouter des tags aux événements. Ces tags sont utilisables dans les conditionnels Logstash :

yaml# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/nginx/access.log
    tags: ["nginx", "access"]

  - type: log
    paths:
      - /var/log/app/api.log
    tags: ["app", "api"]
# logstash pipeline
filter {
  if "nginx" in [tags] {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }

  if "app" in [tags] {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:ts} %{LOGLEVEL:level} %{GREEDYDATA:msg}" }
    }
  }
}

C'est un pattern courant : un Filebeat envoie des logs de plusieurs applications, et Logstash les parse differemment selon les tags.

Verifier dans Kibana

Ouvre http://localhost:5601. Va dans Discover. Cree un data view pour logs-myapp-*.

Tu devrais voir les événements avec les champs :

  • level : INFO, WARN, ERROR, DEBUG
  • service : api-users, api-orders, etc.
  • http_method : GET, POST, PUT, DELETE
  • url : le chemin de la requête
  • status_code : le code HTTP (nombre entier)
  • duration : la duree en millisecondes (nombre entier)
  • environment : development (ajoute par Filebeat)
  • app : myapp (ajoute par Filebeat)
  • tags : slow_request et/ou server_error selon le cas

Les champs numériques (status_code, duration) permettent de créer des visualisations : histogramme des durees, top 10 des endpoints les plus lents, nombre d'erreurs par service.

Sur paltemps.fr, c'est exactement cette architecture qu'on utilise en prod. Filebeat sur chaque serveur, Logstash centralise, Elasticsearch + Kibana pour l'exploration. La stack tient avec 2 Go de RAM pour le trio central (ES 1 Go, Logstash 768 Mo, Kibana 256 Mo).

Résumé

  • Filebeat est l'agent leger (15 Mo) qui collecte les logs sur chaque serveur
  • L'input Beats de Logstash écoûte sur le port 5044
  • Filebeat ajoute des metadonnees (agent, log, ecs) qu'on supprime dans le pipeline
  • Les fields de Filebeat permettent d'ajouter des champs custom (server, env, app)
  • Les tags de Filebeat permettent de router les événements dans les filtres Logstash
  • Si Logstash tombe, Filebeat bufferise et renvoie quand Logstash revient

Precedent : 04 - Inputs stdin et file | Suivant : 06 - Inputs HTTP, TCP, UDP

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.