29 - Cas pratique : enrichir des donnees en temps réel
Ce que tu vas apprendre
- Enrichir des événements avec des donnees de référencé (lookup)
- Le filtre translate pour les dictionnaires statiques
- Le filtre jdbc_static pour les lookups en base de donnees
- Le filtre elasticsearch pour chercher dans un index existant
- Comparer les performances des trois approches
Prerequisites
- Avoir compris les filtres de base (voir articles 09 a 16)
- Avoir un lab Docker fonctionnel
Pourquoi enrichir
Un log applicatif contient user_id: 42. Ca ne dit rien a personne dans un dashboard. Si tu ajoutes user_name: "Alice Dupont" et user_plan: "enterprise", le dashboard devient utile. Le commercial voit que les erreurs touchent un client enterprise. Le support sait qui appeler.
L'enrichissement ajoute du contexte a des événements bruts. Logstash le fait au vol, entre l'input et l'output, sans modifier la source.
Evenement brut Evenement enrichi
┌─────────────────┐ ┌──────────────────────────┐
│ user_id: 42 │ lookup │ user_id: 42 │
│ action: login │ ──────────> │ action: login │
│ ip: 83.156.42.1 │ │ ip: 83.156.42.1 │
│ │ │ user_name: Alice Dupont │
│ │ │ user_plan: enterprise │
│ │ │ geo.country: France │
└─────────────────┘ └──────────────────────────┘
Le filtre translate : dictionnaire statique
Le plus simple. Tu définis un dictionnaire (clé -> valeur) et translate remplace la clé par la valeur.
Dictionnaire inline
filter {
translate {
source => "[level]"
target => "[level_fr]"
dictionary => {
"DEBUG" => "debogage"
"INFO" => "information"
"WARN" => "avertissement"
"ERROR" => "erreur"
"FATAL" => "critique"
}
fallback => "inconnu"
}
}
source : le champ a chercher. target : le champ ou écrire le résultat. fallback : valeur par défaut si la clé n'existe pas dans le dictionnaire.
Dictionnaire dans un fichier YAML
Pour des dictionnaires plus gros, mets-les dans un fichier :
yaml# logstash/dictionaries/services.yml
api-users: "Equipe Backend - Users"
api-orders: "Equipe Backend - Orders"
api-payments: "Equipe Payments"
web-frontend: "Equipe Frontend"
worker-email: "Equipe Notifications"
worker-billing: "Equipe Payments"
filter {
translate {
source => "[service]"
target => "[team]"
dictionary_path => "/usr/share/logstash/dictionaries/services.yml"
refresh_interval => 300
fallback => "Equipe inconnue"
}
}
refresh_interval recharge le fichier toutes les 300 secondes. Tu peux modifier le dictionnaire sans redemarrer Logstash.
Monte le dossier dans Docker :
yamllogstash:
volumes:
- ./logstash/dictionaries/:/usr/share/logstash/dictionaries/:ro
Cas pratique : mapper des codes erreur
yaml# logstash/dictionaries/http-status.yml
"200": "OK"
"201": "Created"
"204": "No Content"
"301": "Moved Permanently"
"400": "Bad Request"
"401": "Unauthorized"
"403": "Forbidden"
"404": "Not Found"
"429": "Too Many Requests"
"500": "Internal Server Error"
"502": "Bad Gateway"
"503": "Service Unavailable"
filter {
translate {
source => "[status_code]"
target => "[status_text]"
dictionary_path => "/usr/share/logstash/dictionaries/http-status.yml"
fallback => "Unknown Status"
}
}
Cas pratique : enrichir avec des IPs de menace
yaml# logstash/dictionaries/threat-ips.yml
"198.51.100.42": "botnet-mirai"
"203.0.113.99": "scanner-shodan"
"192.0.2.15": "bruteforce-ssh"
filter {
translate {
source => "[client_ip]"
target => "[threat_category]"
dictionary_path => "/usr/share/logstash/dictionaries/threat-ips.yml"
fallback => "clean"
}
if [threat_category] != "clean" {
mutate { add_tag => ["threat_detected"] }
}
}
Chaque requête d'une IP connue est taguee. Dans Kibana, tu créés une alerte sur le tag threat_detected.
Le filtre jdbc_static : lookup en base
Translate est statique : un fichier sur disque. Si tes donnees de référencé sont dans une base de donnees et changent souvent, jdbc_static est le bon choix.
jdbc_static charge les donnees de la base en mémoire au démarrage et les rafraichit a intervalles réguliers. Le lookup est en mémoire, pas un appel SQL par événement.
filter {
jdbc_static {
jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
jdbc_user => "app"
jdbc_password => "${PG_PASSWORD}"
# Charger les donnees de reference
loaders => [
{
id => "users"
query => "SELECT id, name, email, plan FROM users WHERE active = true"
local_table => "users_lookup"
},
{
id => "teams"
query => "SELECT service_name, team_name, team_lead FROM service_teams"
local_table => "teams_lookup"
}
]
# Tables locales (schema en memoire)
local_db_objects => [
{
name => "users_lookup"
index_columns => ["id"]
columns => [
["id", "INTEGER"],
["name", "VARCHAR(255)"],
["email", "VARCHAR(255)"],
["plan", "VARCHAR(50)"]
]
},
{
name => "teams_lookup"
index_columns => ["service_name"]
columns => [
["service_name", "VARCHAR(100)"],
["team_name", "VARCHAR(100)"],
["team_lead", "VARCHAR(100)"]
]
}
]
# Faire le lookup
local_lookups => [
{
id => "user_lookup"
query => "SELECT name, email, plan FROM users_lookup WHERE id = :user_id"
parameters => { "user_id" => "[user_id]" }
target => "user"
},
{
id => "team_lookup"
query => "SELECT team_name, team_lead FROM teams_lookup WHERE service_name = :service"
parameters => { "service" => "[service]" }
target => "team"
}
]
# Rafraichir les donnees toutes les 5 minutes
loader_schedule => "*/5 * * * *"
staging_directory => "/tmp/logstash-jdbc-static"
}
}
Le fonctionnement :
- Au démarrage, Logstash exécuté les requêtes
loaderset charge les résultats dans des tables SQLite en mémoire. - Pour chaque événement, les
local_lookupsfont un SELECT sur les tables en mémoire (pas sur PostgreSQL). - Toutes les 5 minutes, les tables sont rafraichies depuis PostgreSQL.
Résultat : un événement avec user_id: 42 est enrichi avec user.name, user.email, user.plan sans appel SQL par événement.
Le filtre elasticsearch : lookup dans un index
Si tes donnees de référencé sont deja dans Elasticsearch :
filter {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "users"
query => "user_id:%{[user_id]}"
fields => {
"name" => "[user][name]"
"email" => "[user][email]"
"plan" => "[user][plan]"
}
enable_sort => false
}
}
Pour chaque événement, Logstash envoie une requête de recherche a Elasticsearch et copie les champs du premier résultat dans l'événement.
C'est le plus flexible, mais le plus lent : un appel HTTP a Elasticsearch par événement. A éviter si tu traites plus de 1 000 événements par seconde.
Comparaison des trois approches
| translate | jdbc_static | elasticsearch filter | |
|---|---|---|---|
| Source | Fichier YAML/CSV | Base SQL | Index Elasticsearch |
| Donnees en mémoire | Oui | Oui (SQLite) | Non (requête a chaque event) |
| Latence par lookup | ~0.01 ms | ~0.05 ms | ~5-20 ms |
| Rafraichissement | refresh_interval |
loader_schedule |
Temps réel |
| Volume de référencé | < 100 000 entrees | < 1 000 000 entrees | Illimite |
| Complexite | Faible | Moyenne | Faible |
| Use case | Dictionnaires statiques | Tables de référencé | Donnees dynamiques |
Mon choix sur paltemps.fr :
- translate pour les mappings fixes (codes erreur, noms d'équipe, categories)
- jdbc_static pour les donnees qui changent (utilisateurs, produits, config)
- elasticsearch filter uniquement quand les deux autres ne suffisent pas (recherche full-text dans les donnees de référencé)
Combiner plusieurs enrichissements
Un pipeline réel combine souvent les trois :
filter {
# 1. Parser le log
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:ts} %{LOGLEVEL:level} \[%{DATA:service}\] user=%{INT:user_id} %{GREEDYDATA:msg}" }
}
# 2. GeoIP sur l'IP client
geoip {
source => "client_ip"
target => "geo"
}
# 3. Translate : mapper le service a une equipe
translate {
source => "[service]"
target => "[team]"
dictionary_path => "/usr/share/logstash/dictionaries/services.yml"
fallback => "unknown"
}
# 4. jdbc_static : enrichir avec les infos utilisateur
jdbc_static {
# ... config comme ci-dessus ...
local_lookups => [
{
id => "user_lookup"
query => "SELECT name, plan FROM users_lookup WHERE id = :uid"
parameters => { "uid" => "[user_id]" }
target => "user"
}
]
}
# 5. Translate : enrichir avec la menace IP
translate {
source => "[client_ip]"
target => "[threat]"
dictionary_path => "/usr/share/logstash/dictionaries/threat-ips.yml"
fallback => "clean"
}
}
L'événement final contient le log parse, la geolocalisation, l'équipe responsable, les infos utilisateur et le statut de menace de l'IP. Tout ca sans modifier l'application source.
Résumé
- L'enrichissement ajoute du contexte aux événements sans modifier la source
translate: dictionnaire statique (fichier YAML), le plus rapidejdbc_static: charge une table SQL en mémoire, rafraichie periodiquementelasticsearchfilter : lookup dans un index ES, le plus flexible mais le plus lent- Combine les trois dans un meme pipeline pour un enrichissement complet
- GeoIP est aussi un enrichissement (vu dans l'article 12)
Precedent : 28 - Cas pratique : ETL PostgreSQL | Suivant : 30 - Logstash en production