Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel

Enrichir des événements avec des lookups externes : translate, jdbc_static, elasticsearch filter et memcached.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

29 - Cas pratique : enrichir des donnees en temps réel

Ce que tu vas apprendre

  • Enrichir des événements avec des donnees de référencé (lookup)
  • Le filtre translate pour les dictionnaires statiques
  • Le filtre jdbc_static pour les lookups en base de donnees
  • Le filtre elasticsearch pour chercher dans un index existant
  • Comparer les performances des trois approches

Prerequisites

  • Avoir compris les filtres de base (voir articles 09 a 16)
  • Avoir un lab Docker fonctionnel

Pourquoi enrichir

Un log applicatif contient user_id: 42. Ca ne dit rien a personne dans un dashboard. Si tu ajoutes user_name: "Alice Dupont" et user_plan: "enterprise", le dashboard devient utile. Le commercial voit que les erreurs touchent un client enterprise. Le support sait qui appeler.

L'enrichissement ajoute du contexte a des événements bruts. Logstash le fait au vol, entre l'input et l'output, sans modifier la source.

Evenement brut                     Evenement enrichi
┌─────────────────┐               ┌──────────────────────────┐
│ user_id: 42     │    lookup     │ user_id: 42              │
│ action: login   │ ──────────>  │ action: login            │
│ ip: 83.156.42.1 │               │ ip: 83.156.42.1          │
│                 │               │ user_name: Alice Dupont   │
│                 │               │ user_plan: enterprise     │
│                 │               │ geo.country: France       │
└─────────────────┘               └──────────────────────────┘

Le filtre translate : dictionnaire statique

Le plus simple. Tu définis un dictionnaire (clé -> valeur) et translate remplace la clé par la valeur.

Dictionnaire inline

filter {
  translate {
    source => "[level]"
    target => "[level_fr]"
    dictionary => {
      "DEBUG" => "debogage"
      "INFO"  => "information"
      "WARN"  => "avertissement"
      "ERROR" => "erreur"
      "FATAL" => "critique"
    }
    fallback => "inconnu"
  }
}

source : le champ a chercher. target : le champ ou écrire le résultat. fallback : valeur par défaut si la clé n'existe pas dans le dictionnaire.

Dictionnaire dans un fichier YAML

Pour des dictionnaires plus gros, mets-les dans un fichier :

yaml# logstash/dictionaries/services.yml

api-users: "Equipe Backend - Users"
api-orders: "Equipe Backend - Orders"
api-payments: "Equipe Payments"
web-frontend: "Equipe Frontend"
worker-email: "Equipe Notifications"
worker-billing: "Equipe Payments"
filter {
  translate {
    source => "[service]"
    target => "[team]"
    dictionary_path => "/usr/share/logstash/dictionaries/services.yml"
    refresh_interval => 300
    fallback => "Equipe inconnue"
  }
}

refresh_interval recharge le fichier toutes les 300 secondes. Tu peux modifier le dictionnaire sans redemarrer Logstash.

Monte le dossier dans Docker :

yamllogstash:
  volumes:
    - ./logstash/dictionaries/:/usr/share/logstash/dictionaries/:ro

Cas pratique : mapper des codes erreur

yaml# logstash/dictionaries/http-status.yml

"200": "OK"
"201": "Created"
"204": "No Content"
"301": "Moved Permanently"
"400": "Bad Request"
"401": "Unauthorized"
"403": "Forbidden"
"404": "Not Found"
"429": "Too Many Requests"
"500": "Internal Server Error"
"502": "Bad Gateway"
"503": "Service Unavailable"
filter {
  translate {
    source => "[status_code]"
    target => "[status_text]"
    dictionary_path => "/usr/share/logstash/dictionaries/http-status.yml"
    fallback => "Unknown Status"
  }
}

Cas pratique : enrichir avec des IPs de menace

yaml# logstash/dictionaries/threat-ips.yml

"198.51.100.42": "botnet-mirai"
"203.0.113.99": "scanner-shodan"
"192.0.2.15": "bruteforce-ssh"
filter {
  translate {
    source => "[client_ip]"
    target => "[threat_category]"
    dictionary_path => "/usr/share/logstash/dictionaries/threat-ips.yml"
    fallback => "clean"
  }

  if [threat_category] != "clean" {
    mutate { add_tag => ["threat_detected"] }
  }
}

Chaque requête d'une IP connue est taguee. Dans Kibana, tu créés une alerte sur le tag threat_detected.

Le filtre jdbc_static : lookup en base

Translate est statique : un fichier sur disque. Si tes donnees de référencé sont dans une base de donnees et changent souvent, jdbc_static est le bon choix.

jdbc_static charge les donnees de la base en mémoire au démarrage et les rafraichit a intervalles réguliers. Le lookup est en mémoire, pas un appel SQL par événement.

filter {
  jdbc_static {
    jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
    jdbc_user => "app"
    jdbc_password => "${PG_PASSWORD}"

    # Charger les donnees de reference
    loaders => [
      {
        id => "users"
        query => "SELECT id, name, email, plan FROM users WHERE active = true"
        local_table => "users_lookup"
      },
      {
        id => "teams"
        query => "SELECT service_name, team_name, team_lead FROM service_teams"
        local_table => "teams_lookup"
      }
    ]

    # Tables locales (schema en memoire)
    local_db_objects => [
      {
        name => "users_lookup"
        index_columns => ["id"]
        columns => [
          ["id", "INTEGER"],
          ["name", "VARCHAR(255)"],
          ["email", "VARCHAR(255)"],
          ["plan", "VARCHAR(50)"]
        ]
      },
      {
        name => "teams_lookup"
        index_columns => ["service_name"]
        columns => [
          ["service_name", "VARCHAR(100)"],
          ["team_name", "VARCHAR(100)"],
          ["team_lead", "VARCHAR(100)"]
        ]
      }
    ]

    # Faire le lookup
    local_lookups => [
      {
        id => "user_lookup"
        query => "SELECT name, email, plan FROM users_lookup WHERE id = :user_id"
        parameters => { "user_id" => "[user_id]" }
        target => "user"
      },
      {
        id => "team_lookup"
        query => "SELECT team_name, team_lead FROM teams_lookup WHERE service_name = :service"
        parameters => { "service" => "[service]" }
        target => "team"
      }
    ]

    # Rafraichir les donnees toutes les 5 minutes
    loader_schedule => "*/5 * * * *"

    staging_directory => "/tmp/logstash-jdbc-static"
  }
}

Le fonctionnement :

  1. Au démarrage, Logstash exécuté les requêtes loaders et charge les résultats dans des tables SQLite en mémoire.
  2. Pour chaque événement, les local_lookups font un SELECT sur les tables en mémoire (pas sur PostgreSQL).
  3. Toutes les 5 minutes, les tables sont rafraichies depuis PostgreSQL.

Résultat : un événement avec user_id: 42 est enrichi avec user.name, user.email, user.plan sans appel SQL par événement.

Le filtre elasticsearch : lookup dans un index

Si tes donnees de référencé sont deja dans Elasticsearch :

filter {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "users"
    query => "user_id:%{[user_id]}"
    fields => {
      "name" => "[user][name]"
      "email" => "[user][email]"
      "plan" => "[user][plan]"
    }
    enable_sort => false
  }
}

Pour chaque événement, Logstash envoie une requête de recherche a Elasticsearch et copie les champs du premier résultat dans l'événement.

C'est le plus flexible, mais le plus lent : un appel HTTP a Elasticsearch par événement. A éviter si tu traites plus de 1 000 événements par seconde.

Comparaison des trois approches

translate jdbc_static elasticsearch filter
Source Fichier YAML/CSV Base SQL Index Elasticsearch
Donnees en mémoire Oui Oui (SQLite) Non (requête a chaque event)
Latence par lookup ~0.01 ms ~0.05 ms ~5-20 ms
Rafraichissement refresh_interval loader_schedule Temps réel
Volume de référencé < 100 000 entrees < 1 000 000 entrees Illimite
Complexite Faible Moyenne Faible
Use case Dictionnaires statiques Tables de référencé Donnees dynamiques

Mon choix sur paltemps.fr :

  • translate pour les mappings fixes (codes erreur, noms d'équipe, categories)
  • jdbc_static pour les donnees qui changent (utilisateurs, produits, config)
  • elasticsearch filter uniquement quand les deux autres ne suffisent pas (recherche full-text dans les donnees de référencé)

Combiner plusieurs enrichissements

Un pipeline réel combine souvent les trois :

filter {
  # 1. Parser le log
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:ts} %{LOGLEVEL:level} \[%{DATA:service}\] user=%{INT:user_id} %{GREEDYDATA:msg}" }
  }

  # 2. GeoIP sur l'IP client
  geoip {
    source => "client_ip"
    target => "geo"
  }

  # 3. Translate : mapper le service a une equipe
  translate {
    source => "[service]"
    target => "[team]"
    dictionary_path => "/usr/share/logstash/dictionaries/services.yml"
    fallback => "unknown"
  }

  # 4. jdbc_static : enrichir avec les infos utilisateur
  jdbc_static {
    # ... config comme ci-dessus ...
    local_lookups => [
      {
        id => "user_lookup"
        query => "SELECT name, plan FROM users_lookup WHERE id = :uid"
        parameters => { "uid" => "[user_id]" }
        target => "user"
      }
    ]
  }

  # 5. Translate : enrichir avec la menace IP
  translate {
    source => "[client_ip]"
    target => "[threat]"
    dictionary_path => "/usr/share/logstash/dictionaries/threat-ips.yml"
    fallback => "clean"
  }
}

L'événement final contient le log parse, la geolocalisation, l'équipe responsable, les infos utilisateur et le statut de menace de l'IP. Tout ca sans modifier l'application source.

Résumé

  • L'enrichissement ajoute du contexte aux événements sans modifier la source
  • translate : dictionnaire statique (fichier YAML), le plus rapide
  • jdbc_static : charge une table SQL en mémoire, rafraichie periodiquement
  • elasticsearch filter : lookup dans un index ES, le plus flexible mais le plus lent
  • Combine les trois dans un meme pipeline pour un enrichissement complet
  • GeoIP est aussi un enrichissement (vu dans l'article 12)

Precedent : 28 - Cas pratique : ETL PostgreSQL | Suivant : 30 - Logstash en production

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.