Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees

Configurer l'output Elasticsearch : index naming, templates, bulk settings, retry, ILM et gestion des erreurs.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

17 - Output Elasticsearch : envoyer les donnees

Ce que tu vas apprendre

  • Configurer l'output Elasticsearch en détail
  • Les stratégies de nommage d'index (par date, par source, par environnement)
  • Les index templates pour contrôler le mapping
  • La Bulk API : taille de batch, retries et gestion d'erreurs
  • ILM (Index Lifecycle Management) pour gerer la retention
  • L'authentification et le TLS

Prerequisites

  • Avoir un pipeline fonctionnel avec des filtres (voir articles 09 a 16)
  • Comprendre ce qu'est un index Elasticsearch

L'output le plus important

95% des pipelines Logstash finissent dans Elasticsearch. C'est le couple naturel. Le reste de cette serie est construit autour de cette destination. Meme si Logstash sait envoyer vers Kafka, S3, des fichiers ou du HTTP (on verra ca dans l'article suivant), Elasticsearch est le cas standard.

Configuration de base

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
}

Deux paramètres suffisent pour démarrer : l'adresse d'Elasticsearch et le nom de l'index. Le %{+YYYY.MM.dd} généré un index par jour : logs-2026.03.31, logs-2026.04.01, etc.

Stratégies de nommage d'index

Le nommage d'index a un impact direct sur les performances de recherche et la gestion de la retention. Voici les patterns courants.

Par date (le plus courant)

index => "logs-%{+YYYY.MM.dd}"

Un index par jour. Avantages : facile a supprimer les vieux logs (supprime l'index du jour), taille d'index previsible, recherches rapides car Elasticsearch ne scanne que les index du range de dates.

Par source et date

index => "logs-%{type}-%{+YYYY.MM.dd}"

Résultat : logs-nginx-2026.03.31, logs-app-2026.03.31. Permet de chercher dans un type spécifique sans filtre, et de gerer la retention differemment par source.

Par environnement

index => "logs-%{environment}-%{+YYYY.MM.dd}"

Résultat : logs-production-2026.03.31, logs-staging-2026.03.31. Utile pour séparer les droits d'acces (les devs voient staging, seuls les ops voient production).

Avec @metadata (propre)

filter {
  mutate {
    add_field => { "[@metadata][target_index]" => "logs-%{type}" }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
  }
}

L'index cible est calcule dans les filtres et stocke dans @metadata. Il n'apparaît pas dans le document Elasticsearch.

Index fixe (pas de date)

index => "products"

Pour les cas ETL ou tu synchronises une table de base de donnees. Pas de rotation par date, un seul index.

Document ID et upsert

Par défaut, Elasticsearch généré un ID unique pour chaque document. Si tu veux contrôler l'ID (pour de l'upsert ou de la deduplication) :

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "products"
    document_id => "%{id}"
    action => "index"
  }
}

Avec document_id => "%{id}", si un document avec le meme ID existe deja, il est ecrase. C'est le pattern standard pour la synchronisation JDBC (voir article 07) : chaque run met à jour les documents existants.

Les actions possibles :

Action Comportement
index (défaut) Cree ou ecrase le document
create Cree uniquement, erreur si le document existe
update Met à jour un document existant (partiel)
delete Supprime le document

Pour un update partiel :

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "products"
    document_id => "%{id}"
    action => "update"
    doc_as_upsert => true
  }
}

doc_as_upsert => true créé le document s'il n'existe pas.

Index templates

Un index template definit le mapping et les settings qui s'appliquent automatiquement quand un nouvel index est créé. Sans template, Elasticsearch utilise le dynamic mapping (il devine les types), ce qui donne souvent de mauvais résultats.

Gerer le template depuis Logstash

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-app-%{+YYYY.MM.dd}"

    # Template management
    manage_template => true
    template_name => "logs-app"
    template => "/usr/share/logstash/templates/logs-app.json"
    template_overwrite => true
  }
}

Le fichier template :

json{
  "index_patterns": ["logs-app-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "refresh_interval": "5s"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "service": { "type": "keyword" },
        "message": { "type": "text" },
        "status_code": { "type": "integer" },
        "duration": { "type": "integer" },
        "url": { "type": "keyword" },
        "http_method": { "type": "keyword" },
        "client_ip": { "type": "ip" },
        "geoip": {
          "properties": {
            "location": { "type": "geo_point" },
            "country_name": { "type": "keyword" },
            "city_name": { "type": "keyword" }
          }
        }
      }
    }
  }
}

Monte le dossier templates dans Docker :

yamllogstash:
  volumes:
    - ./logstash/templates/:/usr/share/logstash/templates/:ro

Structure du projet :

logstash/
├── config/
│   ├── logstash.yml
│   └── pipelines.yml
├── pipeline/
│   └── main.conf
├── templates/
│   └── logs-app.json
├── patterns/
│   └── custom
└── scripts/
    └── enrich.rb

Pourquoi définir le mapping

Champ Dynamic mapping (devine) Template (explicite)
level text + keyword keyword (pas besoin de full-text)
status_code long integer (suffisant)
client_ip text ip (permet les range queries)
geoip.location object geo_point (permet les cartes Kibana)
url text + keyword keyword (aggregations + filtres exacts)

Le dynamic mapping généré des champs text avec un sous-champ .keyword pour beaucoup de champs. Ca double l'espace disque pour des champs qui n'ont pas besoin de full-text search. Un template explicite economise du stockage et ameliore les performances.

La Bulk API sous le capot

Logstash n'envoie pas les documents un par un. Il les accumule en batch et les envoie via la Bulk API d'Elasticsearch.

┌──────────┐    batch de 125    ┌──────────────┐
│ Logstash │ ────────────────> │ Elasticsearch│
│          │    evenements      │              │
│ worker   │                    │  Bulk API    │
│          │ <──────────────── │              │
│          │    reponse bulk    │  200 OK      │
└──────────┘                    └──────────────┘

Paramètres de la Bulk API

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"

    # Taille du batch
    flush_size => 500

    # Delai max avant d'envoyer un batch incomplet
    idle_flush_time => 1
  }
}
Paramètre Défaut Description
flush_size Herite de pipeline.batch.size Nombre d'événements par batch
idle_flush_time 1 seconde Temps max avant d'envoyer un batch incomplet

En pratique, la taille du batch est contrôlée par pipeline.batch.size dans pipelines.yml (défaut : 125). Si tu as 125 événements en attente, Logstash envoie. Sinon, il attend idle_flush_time secondes et envoie ce qu'il a.

Retries

Si Elasticsearch répond avec une erreur (503 Service Unavailable, 429 Too Many Requests), Logstash retente :

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"

    # Retries
    retry_max_interval => 64
    retry_initial_interval => 2
  }
}

Logstash utilise un backoff exponentiel : 2s, 4s, 8s, 16s, 32s, 64s. Si Elasticsearch ne revient pas apres toutes les retries, les événements vont dans la Dead Letter Queue (si activee, voir article 23).

Erreurs partielles

La Bulk API peut reussir pour certains documents et échouer pour d'autres. Un mapping conflict, par exemple : tu envoies un string dans un champ integer. Logstash retente uniquement les documents en échec, pas tout le batch.

ILM (Index Lifecycle Management)

ILM gere le cycle de vie des index automatiquement : création, rollover (changer d'index quand il atteint une taille), passage en cold storage, suppression.

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-app"

    ilm_enabled => true
    ilm_rollover_alias => "logs-app"
    ilm_pattern => "{now/d}-000001"
    ilm_policy => "logs-policy"
  }
}

La politique ILM se créé dans Kibana (Stack Management > Index Lifecycle Policies) ou via l'API :

bashcurl -X PUT "http://localhost:9200/_ilm/policy/logs-policy" \
  -H "Content-Type: application/json" \
  -d '{
    "policy": {
      "phases": {
        "hot": {
          "min_age": "0ms",
          "actions": {
            "rollover": {
              "max_size": "50gb",
              "max_age": "1d"
            }
          }
        },
        "warm": {
          "min_age": "7d",
          "actions": {
            "shrink": { "number_of_shards": 1 },
            "forcemerge": { "max_num_segments": 1 }
          }
        },
        "delete": {
          "min_age": "30d",
          "actions": {
            "delete": {}
          }
        }
      }
    }
  }'

Cette politique :

  1. Hot : l'index recouvre les ecritures. Rollover quand il atteint 50 Go ou 1 jour.
  2. Warm (apres 7 jours) : l'index est compacte (1 shard, 1 segment). Plus d'ecritures, juste de la lecture.
  3. Delete (apres 30 jours) : l'index est supprime.

ILM remplace le pattern logs-%{+YYYY.MM.dd} + cron de suppression. C'est plus propre et intégré dans Elasticsearch.

Authentification

User/password

output {
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    user => "logstash_writer"
    password => "s3cret"
  }
}

API key

output {
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    api_key => "id:api_key_encoded"
  }
}

TLS/SSL

output {
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    ssl_enabled => true
    ssl_certificate_authorities => ["/certs/ca.crt"]
    ssl_verification_mode => "full"
    user => "logstash_writer"
    password => "s3cret"
  }
}

On detaillera la sécurité complète dans l'article 24. Pour le dev local, xpack.security.enabled=false dans Elasticsearch et pas d'auth dans Logstash.

Plusieurs clusters Elasticsearch

Tu peux envoyer les memes donnees vers deux clusters (replication, migration) :

output {
  elasticsearch {
    hosts => ["http://es-cluster-1:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  elasticsearch {
    hosts => ["http://es-cluster-2:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
}

Chaque événement est envoye aux deux clusters. C'est du fan-out : si un cluster est lent, ca n'affecte pas l'autre (les outputs sont independants).

Configuration de production recommandee

output {
  elasticsearch {
    hosts => ["http://es-node-1:9200", "http://es-node-2:9200", "http://es-node-3:9200"]
    index => "logs-%{[@metadata][source]}-%{+YYYY.MM.dd}"

    # Template
    manage_template => true
    template_name => "logs"
    template => "/usr/share/logstash/templates/logs.json"
    template_overwrite => true

    # Retries
    retry_max_interval => 64
    retry_initial_interval => 2

    # Sniffing (decouvrir les noeuds du cluster automatiquement)
    sniffing => false

    # Compression
    http_compression => true
  }
}

Notes sur la production :

  • Plusieurs hosts : Logstash distribue les requêtes entre les noeuds. Si un noeud tombe, Logstash utilise les autres.
  • Sniffing : decouvre les noeuds du cluster automatiquement. Utile mais désactivé par défaut parce qu'il pose des problèmes avec Docker et les proxies. Je le laisse a false sur paltemps.fr.
  • http_compression : compresse les requêtes HTTP. Reduit la bande passante de 60-80% entre Logstash et Elasticsearch.

Résumé

  • L'output Elasticsearch est le plus utilise, configure avec hosts et index
  • Le nommage d'index par date (%{+YYYY.MM.dd}) facilite la retention et les recherches
  • document_id permet l'upsert pour la synchronisation JDBC
  • Les index templates controlent le mapping et evitent le dynamic mapping
  • La Bulk API envoie des batches, avec retries et backoff exponentiel
  • ILM gere le cycle de vie des index (rollover, warm, delete) automatiquement
  • En production : plusieurs hosts, compression HTTP, template explicite

Precedent : 16 - Conditionnels | Suivant : 18 - Outputs file, stdout et autres

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.