Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs

Pipeline complet pour collecter, parser et visualiser des logs d'une application Node.js avec Filebeat, Logstash, Elasticsearch et Kibana.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

27 - Cas pratique : centraliser des logs applicatifs

Ce que tu vas apprendre

  • Monter une stack complète de A a Z : application -> logs -> Filebeat -> Logstash -> Elasticsearch -> Kibana
  • Configurer une application Node.js avec des logs JSON structures
  • Écrire un pipeline Logstash qui parse, enrichit et indexé
  • Creer un index template pour un mapping propre
  • Explorer les donnees dans Kibana

Prerequisites

  • Tous les articles précédents (c'est le cas pratique qui reunit tout)
  • Node.js/Bun installe pour l'application de demo (optionnel, on fournit un container)

Le projet complet

On va construire une stack de centralisation de logs pour une API HTTP. L'API écrit ses logs en JSON dans un fichier. Filebeat les collecte. Logstash les enrichit. Elasticsearch les stocke. Kibana les affiche.

Architecture

┌─────────────────┐     ┌──────────┐     ┌──────────┐     ┌──────────────┐     ┌──────────┐
│   Application   │     │          │     │          │     │              │     │          │
│   Node.js API   │────>│ Filebeat │────>│ Logstash │────>│ Elasticsearch│────>│  Kibana  │
│                 │     │          │     │          │     │              │     │          │
│ logs/app.log    │     │ :5044    │     │ parse    │     │ :9200        │     │ :5601    │
│ (JSON lines)    │     │ collecte │     │ enrichit │     │ stocke       │     │ affiche  │
└─────────────────┘     └──────────┘     └──────────┘     └──────────────┘     └──────────┘

Structure du projet

centralised-logs/
├── compose.yaml
├── app/
│   ├── Dockerfile
│   ├── package.json
│   └── src/
│       ├── index.ts
│       └── logger.ts
├── filebeat/
│   └── filebeat.yml
├── logstash/
│   ├── config/
│   │   ├── logstash.yml
│   │   └── pipelines.yml
│   ├── pipeline/
│   │   └── app-logs.conf
│   └── templates/
│       └── app-logs.json
└── data/
    └── logs/

L'application

Une API minimaliste avec Hono qui écrit des logs JSON :

typescript// app/src/logger.ts

import { appendFileSync } from "fs";

type LogLevel = "DEBUG" | "INFO" | "WARN" | "ERROR";

const LOG_FILE = process.env.LOG_FILE || "/logs/app.log";

export function log(level: LogLevel, message: string, meta?: Record<string, unknown>) {
  const entry = {
    "@timestamp": new Date().toISOString(),
    level,
    service: process.env.SERVICE_NAME || "api",
    message,
    pid: process.pid,
    hostname: process.env.HOSTNAME || "unknown",
    ...meta,
  };

  appendFileSync(LOG_FILE, JSON.stringify(entry) + "\n");
}
typescript// app/src/index.ts

import { Hono } from "hono";
import { log } from "./logger";

const app = new Hono();

// Middleware de logging
app.use("*", async (c, next) => {
  const start = Date.now();
  const requestId = crypto.randomUUID().slice(0, 8);

  await next();

  const duration = Date.now() - start;
  const status = c.res.status;

  log(status >= 500 ? "ERROR" : status >= 400 ? "WARN" : "INFO", "Request completed", {
    request_id: requestId,
    method: c.req.method,
    url: c.req.path,
    status_code: status,
    duration_ms: duration,
    user_agent: c.req.header("user-agent") || "unknown",
    client_ip: c.req.header("x-forwarded-for") || "127.0.0.1",
  });
});

app.get("/health", (c) => c.json({ status: "ok" }));

app.get("/users", (c) => {
  log("DEBUG", "Fetching users from database", { query: "SELECT * FROM users" });
  return c.json([
    { id: 1, name: "Alice" },
    { id: 2, name: "Bob" },
  ]);
});

app.get("/users/:id", (c) => {
  const id = c.req.param("id");
  if (id === "999") {
    log("ERROR", "User not found", { user_id: id });
    return c.json({ error: "User not found" }, 404);
  }
  return c.json({ id: Number(id), name: "Alice" });
});

app.post("/orders", (c) => {
  const delay = Math.random() * 2000;
  if (delay > 1500) {
    log("WARN", "Slow database query", { duration_ms: Math.round(delay), query: "INSERT INTO orders" });
  }
  return c.json({ id: Math.floor(Math.random() * 10000), status: "created" }, 201);
});

export default {
  port: 3000,
  fetch: app.fetch,
};
dockerfile# app/Dockerfile
FROM oven/bun:1
WORKDIR /app
COPY package.json bun.lockb* ./
RUN bun install
COPY src/ src/
ENV SERVICE_NAME=api-demo
ENV LOG_FILE=/logs/app.log
CMD ["bun", "run", "src/index.ts"]
json{
  "name": "api-demo",
  "dependencies": {
    "hono": "^4.0.0"
  }
}

La configuration Filebeat

yaml# filebeat/filebeat.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /logs/app.log
    json.keys_under_root: true
    json.add_error_key: true
    json.overwrite_keys: true
    fields:
      app: api-demo
      environment: development
    fields_under_root: true

output.logstash:
  hosts: ["logstash:5044"]

logging.level: warning
logging.to_stderr: true

setup.template.enabled: false
setup.ilm.enabled: false

Le bloc json.* est important. json.keys_under_root: true met les champs JSON a la racine de l'événement (pas dans un sous-objet json). json.overwrite_keys: true permet au JSON de l'app d'ecraser les champs Filebeat (notamment @timestamp).

Le pipeline Logstash

# logstash/pipeline/app-logs.conf

input {
  beats {
    port => 5044
  }
}

filter {
  # 1. Supprimer les health checks
  if [url] == "/health" {
    drop {}
  }

  # 2. Assurer que @timestamp vient du log, pas de Filebeat
  if [json_error] {
    # Le JSON n'a pas ete parse par Filebeat, essayer manuellement
    json {
      source => "message"
      tag_on_failure => ["_json_parse_error"]
    }
  }

  # 3. Geolocaliser l'IP client
  if [client_ip] and [client_ip] != "127.0.0.1" {
    geoip {
      source => "client_ip"
      target => "geo"
      fields => ["country_name", "city_name", "location"]
    }
  }

  # 4. Classifier la severite
  if [status_code] {
    if [status_code] >= 500 {
      mutate { add_field => { "severity" => "critical" } }
      mutate { add_tag => ["alert"] }
    } else if [status_code] >= 400 {
      mutate { add_field => { "severity" => "warning" } }
    } else {
      mutate { add_field => { "severity" => "normal" } }
    }
  }

  # 5. Flaguer les requetes lentes
  if [duration_ms] and [duration_ms] > 1000 {
    mutate { add_tag => ["slow_request"] }
  }

  # 6. Nettoyer les metadonnees Filebeat
  mutate {
    remove_field => ["agent", "ecs", "input", "log", "@version", "event", "json_error"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
    manage_template => true
    template_name => "app-logs"
    template => "/usr/share/logstash/templates/app-logs.json"
    template_overwrite => true
  }
}

L'index template

json{
  "index_patterns": ["app-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "refresh_interval": "5s"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "severity": { "type": "keyword" },
        "service": { "type": "keyword" },
        "message": { "type": "text" },
        "request_id": { "type": "keyword" },
        "method": { "type": "keyword" },
        "url": { "type": "keyword" },
        "status_code": { "type": "integer" },
        "duration_ms": { "type": "integer" },
        "user_agent": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
        "client_ip": { "type": "ip" },
        "hostname": { "type": "keyword" },
        "pid": { "type": "integer" },
        "app": { "type": "keyword" },
        "environment": { "type": "keyword" },
        "geo": {
          "properties": {
            "location": { "type": "geo_point" },
            "country_name": { "type": "keyword" },
            "city_name": { "type": "keyword" }
          }
        },
        "tags": { "type": "keyword" }
      }
    }
  }
}

Le compose.yaml

yamlservices:
  app:
    build: ./app
    container_name: app
    ports:
      - "3000:3000"
    volumes:
      - logs-volume:/logs
    environment:
      - SERVICE_NAME=api-demo
      - LOG_FILE=/logs/app.log

  filebeat:
    image: docker.elastic.co/beats/filebeat:8.17.0
    container_name: filebeat
    user: root
    volumes:
      - ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
      - logs-volume:/logs:ro
    depends_on:
      - logstash
    command: ["filebeat", "-e", "-strict.perms=false"]

  logstash:
    image: docker.elastic.co/logstash/logstash:8.17.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms256m -Xmx256m"
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./logstash/templates/:/usr/share/logstash/templates/:ro
    ports:
      - "5044:5044"
      - "9600:9600"
    depends_on:
      elasticsearch:
        condition: service_healthy

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10

  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  es-data:
  logs-volume:

Lancer et tester

bash# Demarrer la stack
docker compose up -d --build

# Attendre que tout demarre (30-60 secondes)
docker compose ps

# Generer du trafic
for i in $(seq 1 50); do
  curl -s http://localhost:3000/users > /dev/null
  curl -s http://localhost:3000/users/1 > /dev/null
  curl -s http://localhost:3000/users/999 > /dev/null
  curl -s http://localhost:3000/orders -X POST > /dev/null
  curl -s http://localhost:3000/health > /dev/null
done

echo "50 iterations de trafic envoyees"

Verifie que les donnees arrivent dans Elasticsearch :

bashcurl -s "http://localhost:9200/app-logs-*/_count" | python3 -m json.tool

Tu devrais voir un count supérieur a 0 (les health checks sont filtres, donc pas 250).

Explorer dans Kibana

Ouvre http://localhost:5601.

  1. Creer un Data View : Stack Management > Data Views > Create. Pattern : app-logs-*. Timestamp field : @timestamp.

  2. Discover : filtre par level: ERROR pour voir les 404. Filtre par tags: slow_request pour les requêtes lentes.

  3. Dashboard : créé un dashboard avec :

    • Un histogramme de @timestamp (volume de logs dans le temps)
    • Un camembert de level (repartition INFO/WARN/ERROR)
    • Un top 10 de url (endpoints les plus appeles)
    • Une metrique de duration_ms (percentile 95)
    • Un tableau des derniers événements level: ERROR

C'est la stack complète que j'utilise sur paltemps.fr pour chaque projet. Le pipeline change selon l'application, mais l'architecture est toujours la meme : JSON logs -> Filebeat -> Logstash -> Elasticsearch -> Kibana.

Résumé

  • Une stack de logs complète tient en 5 services Docker et ~10 fichiers de config
  • L'application écrit des logs JSON structures (un objet par ligne)
  • Filebeat collecte et parse le JSON avec json.keys_under_root
  • Logstash enrichit (GeoIP, classification, tags), nettoie et indexé
  • Un index template explicite évité les surprises du dynamic mapping
  • Kibana Discover + Dashboard pour l'exploration et le monitoring

Precedent : 26 - Testing | Suivant : 28 - Cas pratique : ETL PostgreSQL

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.