17 - Output Elasticsearch : envoyer les donnees
Ce que tu vas apprendre
- Configurer l'output Elasticsearch en détail
- Les stratégies de nommage d'index (par date, par source, par environnement)
- Les index templates pour contrôler le mapping
- La Bulk API : taille de batch, retries et gestion d'erreurs
- ILM (Index Lifecycle Management) pour gerer la retention
- L'authentification et le TLS
Prerequisites
- Avoir un pipeline fonctionnel avec des filtres (voir articles 09 a 16)
- Comprendre ce qu'est un index Elasticsearch
L'output le plus important
95% des pipelines Logstash finissent dans Elasticsearch. C'est le couple naturel. Le reste de cette serie est construit autour de cette destination. Meme si Logstash sait envoyer vers Kafka, S3, des fichiers ou du HTTP (on verra ca dans l'article suivant), Elasticsearch est le cas standard.
Configuration de base
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
Deux paramètres suffisent pour démarrer : l'adresse d'Elasticsearch et le nom de l'index. Le %{+YYYY.MM.dd} généré un index par jour : logs-2026.03.31, logs-2026.04.01, etc.
Stratégies de nommage d'index
Le nommage d'index a un impact direct sur les performances de recherche et la gestion de la retention. Voici les patterns courants.
Par date (le plus courant)
index => "logs-%{+YYYY.MM.dd}"
Un index par jour. Avantages : facile a supprimer les vieux logs (supprime l'index du jour), taille d'index previsible, recherches rapides car Elasticsearch ne scanne que les index du range de dates.
Par source et date
index => "logs-%{type}-%{+YYYY.MM.dd}"
Résultat : logs-nginx-2026.03.31, logs-app-2026.03.31. Permet de chercher dans un type spécifique sans filtre, et de gerer la retention differemment par source.
Par environnement
index => "logs-%{environment}-%{+YYYY.MM.dd}"
Résultat : logs-production-2026.03.31, logs-staging-2026.03.31. Utile pour séparer les droits d'acces (les devs voient staging, seuls les ops voient production).
Avec @metadata (propre)
filter {
mutate {
add_field => { "[@metadata][target_index]" => "logs-%{type}" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
}
}
L'index cible est calcule dans les filtres et stocke dans @metadata. Il n'apparaît pas dans le document Elasticsearch.
Index fixe (pas de date)
index => "products"
Pour les cas ETL ou tu synchronises une table de base de donnees. Pas de rotation par date, un seul index.
Document ID et upsert
Par défaut, Elasticsearch généré un ID unique pour chaque document. Si tu veux contrôler l'ID (pour de l'upsert ou de la deduplication) :
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{id}"
action => "index"
}
}
Avec document_id => "%{id}", si un document avec le meme ID existe deja, il est ecrase. C'est le pattern standard pour la synchronisation JDBC (voir article 07) : chaque run met à jour les documents existants.
Les actions possibles :
| Action | Comportement |
|---|---|
index (défaut) |
Cree ou ecrase le document |
create |
Cree uniquement, erreur si le document existe |
update |
Met à jour un document existant (partiel) |
delete |
Supprime le document |
Pour un update partiel :
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{id}"
action => "update"
doc_as_upsert => true
}
}
doc_as_upsert => true créé le document s'il n'existe pas.
Index templates
Un index template definit le mapping et les settings qui s'appliquent automatiquement quand un nouvel index est créé. Sans template, Elasticsearch utilise le dynamic mapping (il devine les types), ce qui donne souvent de mauvais résultats.
Gerer le template depuis Logstash
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-app-%{+YYYY.MM.dd}"
# Template management
manage_template => true
template_name => "logs-app"
template => "/usr/share/logstash/templates/logs-app.json"
template_overwrite => true
}
}
Le fichier template :
json{
"index_patterns": ["logs-app-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"level": { "type": "keyword" },
"service": { "type": "keyword" },
"message": { "type": "text" },
"status_code": { "type": "integer" },
"duration": { "type": "integer" },
"url": { "type": "keyword" },
"http_method": { "type": "keyword" },
"client_ip": { "type": "ip" },
"geoip": {
"properties": {
"location": { "type": "geo_point" },
"country_name": { "type": "keyword" },
"city_name": { "type": "keyword" }
}
}
}
}
}
}
Monte le dossier templates dans Docker :
yamllogstash:
volumes:
- ./logstash/templates/:/usr/share/logstash/templates/:ro
Structure du projet :
logstash/
├── config/
│ ├── logstash.yml
│ └── pipelines.yml
├── pipeline/
│ └── main.conf
├── templates/
│ └── logs-app.json
├── patterns/
│ └── custom
└── scripts/
└── enrich.rb
Pourquoi définir le mapping
| Champ | Dynamic mapping (devine) | Template (explicite) |
|---|---|---|
level |
text + keyword | keyword (pas besoin de full-text) |
status_code |
long | integer (suffisant) |
client_ip |
text | ip (permet les range queries) |
geoip.location |
object | geo_point (permet les cartes Kibana) |
url |
text + keyword | keyword (aggregations + filtres exacts) |
Le dynamic mapping généré des champs text avec un sous-champ .keyword pour beaucoup de champs. Ca double l'espace disque pour des champs qui n'ont pas besoin de full-text search. Un template explicite economise du stockage et ameliore les performances.
La Bulk API sous le capot
Logstash n'envoie pas les documents un par un. Il les accumule en batch et les envoie via la Bulk API d'Elasticsearch.
┌──────────┐ batch de 125 ┌──────────────┐
│ Logstash │ ────────────────> │ Elasticsearch│
│ │ evenements │ │
│ worker │ │ Bulk API │
│ │ <──────────────── │ │
│ │ reponse bulk │ 200 OK │
└──────────┘ └──────────────┘
Paramètres de la Bulk API
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
# Taille du batch
flush_size => 500
# Delai max avant d'envoyer un batch incomplet
idle_flush_time => 1
}
}
| Paramètre | Défaut | Description |
|---|---|---|
flush_size |
Herite de pipeline.batch.size |
Nombre d'événements par batch |
idle_flush_time |
1 seconde | Temps max avant d'envoyer un batch incomplet |
En pratique, la taille du batch est contrôlée par pipeline.batch.size dans pipelines.yml (défaut : 125). Si tu as 125 événements en attente, Logstash envoie. Sinon, il attend idle_flush_time secondes et envoie ce qu'il a.
Retries
Si Elasticsearch répond avec une erreur (503 Service Unavailable, 429 Too Many Requests), Logstash retente :
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
# Retries
retry_max_interval => 64
retry_initial_interval => 2
}
}
Logstash utilise un backoff exponentiel : 2s, 4s, 8s, 16s, 32s, 64s. Si Elasticsearch ne revient pas apres toutes les retries, les événements vont dans la Dead Letter Queue (si activee, voir article 23).
Erreurs partielles
La Bulk API peut reussir pour certains documents et échouer pour d'autres. Un mapping conflict, par exemple : tu envoies un string dans un champ integer. Logstash retente uniquement les documents en échec, pas tout le batch.
ILM (Index Lifecycle Management)
ILM gere le cycle de vie des index automatiquement : création, rollover (changer d'index quand il atteint une taille), passage en cold storage, suppression.
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-app"
ilm_enabled => true
ilm_rollover_alias => "logs-app"
ilm_pattern => "{now/d}-000001"
ilm_policy => "logs-policy"
}
}
La politique ILM se créé dans Kibana (Stack Management > Index Lifecycle Policies) ou via l'API :
bashcurl -X PUT "http://localhost:9200/_ilm/policy/logs-policy" \
-H "Content-Type: application/json" \
-d '{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}'
Cette politique :
- Hot : l'index recouvre les ecritures. Rollover quand il atteint 50 Go ou 1 jour.
- Warm (apres 7 jours) : l'index est compacte (1 shard, 1 segment). Plus d'ecritures, juste de la lecture.
- Delete (apres 30 jours) : l'index est supprime.
ILM remplace le pattern logs-%{+YYYY.MM.dd} + cron de suppression. C'est plus propre et intégré dans Elasticsearch.
Authentification
User/password
output {
elasticsearch {
hosts => ["https://elasticsearch:9200"]
user => "logstash_writer"
password => "s3cret"
}
}
API key
output {
elasticsearch {
hosts => ["https://elasticsearch:9200"]
api_key => "id:api_key_encoded"
}
}
TLS/SSL
output {
elasticsearch {
hosts => ["https://elasticsearch:9200"]
ssl_enabled => true
ssl_certificate_authorities => ["/certs/ca.crt"]
ssl_verification_mode => "full"
user => "logstash_writer"
password => "s3cret"
}
}
On detaillera la sécurité complète dans l'article 24. Pour le dev local, xpack.security.enabled=false dans Elasticsearch et pas d'auth dans Logstash.
Plusieurs clusters Elasticsearch
Tu peux envoyer les memes donnees vers deux clusters (replication, migration) :
output {
elasticsearch {
hosts => ["http://es-cluster-1:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
elasticsearch {
hosts => ["http://es-cluster-2:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
Chaque événement est envoye aux deux clusters. C'est du fan-out : si un cluster est lent, ca n'affecte pas l'autre (les outputs sont independants).
Configuration de production recommandee
output {
elasticsearch {
hosts => ["http://es-node-1:9200", "http://es-node-2:9200", "http://es-node-3:9200"]
index => "logs-%{[@metadata][source]}-%{+YYYY.MM.dd}"
# Template
manage_template => true
template_name => "logs"
template => "/usr/share/logstash/templates/logs.json"
template_overwrite => true
# Retries
retry_max_interval => 64
retry_initial_interval => 2
# Sniffing (decouvrir les noeuds du cluster automatiquement)
sniffing => false
# Compression
http_compression => true
}
}
Notes sur la production :
- Plusieurs hosts : Logstash distribue les requêtes entre les noeuds. Si un noeud tombe, Logstash utilise les autres.
- Sniffing : decouvre les noeuds du cluster automatiquement. Utile mais désactivé par défaut parce qu'il pose des problèmes avec Docker et les proxies. Je le laisse a
falsesur paltemps.fr. - http_compression : compresse les requêtes HTTP. Reduit la bande passante de 60-80% entre Logstash et Elasticsearch.
Résumé
- L'output Elasticsearch est le plus utilise, configure avec
hostsetindex - Le nommage d'index par date (
%{+YYYY.MM.dd}) facilite la retention et les recherches document_idpermet l'upsert pour la synchronisation JDBC- Les index templates controlent le mapping et evitent le dynamic mapping
- La Bulk API envoie des batches, avec retries et backoff exponentiel
- ILM gere le cycle de vie des index (rollover, warm, delete) automatiquement
- En production : plusieurs hosts, compression HTTP, template explicite
Precedent : 16 - Conditionnels | Suivant : 18 - Outputs file, stdout et autres