Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees

Lire des messages Kafka et des tables de base de donnees avec les inputs Kafka et JDBC. Configuration complète et exemples Docker.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

07 - Inputs Kafka et JDBC : sources avancees

Ce que tu vas apprendre

  • Lire des messages depuis un topic Kafka
  • Comprendre les consumer groups et la gestion des offsets
  • Configurer l'input JDBC pour poller une base PostgreSQL
  • Faire de la synchronisation incrementale avec sql_last_value
  • Monter un lab Docker complet pour chaque scénario

Prerequisites

  • Le lab Docker de base (voir article 02)
  • Des notions de Kafka (topics, partitions, consumer groups) aident mais ne sont pas obligatoires
  • Des notions de SQL (SELECT, WHERE) suffisent pour la partie JDBC

Kafka : pourquoi c'est le combo parfait avec Logstash

Quand j'ai découvert Kafka + Logstash, j'ai mis un moment a comprendre pourquoi les deux ensemble. Filebeat envoie directement a Logstash, pourquoi ajouter Kafka au milieu ?

La réponse tient en un mot : découplage. Si Logstash tombe, Filebeat bufferise en local, mais sa capacité est limitee. Avec Kafka entre les deux, tu as un buffer distribue et durable. Les messages restent dans Kafka des jours, des semaines si tu veux. Logstash peut tomber, remonter, et reprendre ou il en etait. Zero perte, meme pour des interruptions longues.

L'autre avantage : tu peux avoir plusieurs consumers. Logstash lit les logs pour Elasticsearch, un autre service les lit pour de l'alerting, un troisieme pour de l'archivage. Chaque consumer avance a son rythme.

┌──────────┐     ┌───────────────────────────┐     ┌──────────┐     ┌──────────────┐
│ Filebeat │────>│       Kafka               │────>│ Logstash │────>│ Elasticsearch│
│ Filebeat │────>│                           │     │          │     │              │
│ Filebeat │────>│  topic: logs-app          │     │ consumer │     │              │
└──────────┘     │  partitions: 3            │     │ group:   │     └──────────────┘
                 │  retention: 7 jours       │     │ logstash │
                 │                           │     └──────────┘
                 │                           │
                 │                           │────>┌──────────┐
                 │                           │     │ Alerting │
                 └───────────────────────────┘     └──────────┘

Lab Kafka + Logstash

On va monter un lab avec Kafka en mode KRaft (sans Zookeeper, plus simple depuis Kafka 3.3).

Structure du projet :

kafka-lab/
├── compose.yaml
├── logstash/
│   ├── config/
│   │   ├── logstash.yml
│   │   └── pipelines.yml
│   └── pipeline/
│       └── kafka.conf
└── scripts/
    └── produce-messages.sh

Le compose.yaml :

yamlservices:
  kafka:
    image: bitnami/kafka:3.7
    container_name: kafka
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
    ports:
      - "9092:9092"

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10

  logstash:
    image: docker.elastic.co/logstash/logstash:8.17.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms256m -Xmx256m"
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
    depends_on:
      elasticsearch:
        condition: service_healthy
      kafka:
        condition: service_started

  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  es-data:

Le pipeline Kafka

# logstash/pipeline/kafka.conf

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["logs-app"]
    group_id => "logstash-indexer"
    codec => json
    consumer_threads => 2
    auto_offset_reset => "earliest"
    decorate_events => "basic"
  }
}

filter {
  # Convertir les types si necessaire
  if [duration_ms] {
    mutate {
      convert => { "duration_ms" => "integer" }
    }
  }

  if [status_code] {
    mutate {
      convert => { "status_code" => "integer" }
    }
  }
}

output {
  stdout { codec => rubydebug }

  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-kafka-%{+YYYY.MM.dd}"
  }
}

Les paramètres de l'input Kafka :

Paramètre Description
bootstrap_servers Adresse du broker Kafka
topics Liste des topics a consommer (tableau)
group_id Identifiant du consumer group. Plusieurs Logstash avec le meme group_id se partagent les partitions
codec Format des messages. json si les producteurs envoient du JSON
consumer_threads Nombre de threads consommateurs. Idealement = nombre de partitions du topic
auto_offset_reset earliest : lit depuis le début du topic. latest : ne lit que les nouveaux messages
decorate_events Ajoute des metadonnees Kafka (topic, partition, offset) a l'événement

Produire des messages de test

bash#!/bin/bash
# scripts/produce-messages.sh

SERVICES=("api-users" "api-orders" "api-payments")
LEVELS=("INFO" "WARN" "ERROR")

for i in $(seq 1 20); do
  SERVICE=${SERVICES[$RANDOM % ${#SERVICES[@]}]}
  LEVEL=${LEVELS[$RANDOM % ${#LEVELS[@]}]}
  DURATION=$((RANDOM % 5000))
  STATUS=$((200 + (RANDOM % 4) * 100))
  TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

  MSG="{\"@timestamp\":\"${TS}\",\"level\":\"${LEVEL}\",\"service\":\"${SERVICE}\",\"status_code\":${STATUS},\"duration_ms\":${DURATION}}"

  docker exec kafka kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic logs-app <<< "$MSG"

  sleep 0.5
done

echo "20 messages produits dans le topic logs-app"
bashchmod +x scripts/produce-messages.sh
./scripts/produce-messages.sh

Consumer groups et offsets

Le group_id est fondamental. Kafka retient la position de lecture (l'offset) par consumer group. Si Logstash redemarrage avec le meme group_id, il reprend là où il s'etait arrêté.

Si tu lances deux instances Logstash avec le meme group_id, Kafka repartit les partitions entre elles. Chaque instance traite une partie des messages. C'est du load balancing automatique.

Si tu changes le group_id, le nouveau consumer recommence depuis le début (ou la fin, selon auto_offset_reset).

Les metadonnees Kafka

Avec decorate_events => "basic", chaque événement contient les metadonnees Kafka :

json{
  "@metadata": {
    "kafka": {
      "topic": "logs-app",
      "partition": 1,
      "offset": 42,
      "consumer_group": "logstash-indexer",
      "timestamp": 1711889400000
    }
  }
}

Ces metadonnees sont dans @metadata, donc invisibles dans l'output. Mais tu peux les utiliser dans les filtres et conditionnels :

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
  }
}

JDBC : lire dans une base de donnees

L'input JDBC transforme Logstash en outil ETL. Tu ecris une requête SQL, Logstash l'exécuté a intervalles réguliers et envoie les résultats ligne par ligne dans le pipeline.

┌──────────────┐     ┌──────────┐     ┌──────────────┐
│ PostgreSQL   │     │ Logstash │     │ Elasticsearch│
│              │     │          │     │              │
│ SELECT *     │────>│  JDBC    │────>│ index:       │
│ FROM products│     │  input   │     │ products     │
│ WHERE        │     │          │     │              │
│ updated_at > │     │ toutes   │     │ full-text    │
│ :sql_last    │     │ les 30s  │     │ search       │
│ _value       │     │          │     │              │
└──────────────┘     └──────────┘     └──────────────┘

Lab JDBC + PostgreSQL

Structure du projet :

jdbc-lab/
├── compose.yaml
├── logstash/
│   ├── config/
│   │   ├── logstash.yml
│   │   └── pipelines.yml
│   ├── pipeline/
│   │   └── jdbc.conf
│   └── drivers/
│       └── postgresql-42.7.3.jar
└── sql/
    └── init.sql

Le driver JDBC PostgreSQL est un fichier .jar qu'il faut telecharger. Tu le trouves sur jdbc.postgresql.org. Place-le dans logstash/drivers/.

bash# Telecharger le driver
curl -L -o logstash/drivers/postgresql-42.7.3.jar \
  https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

Schema SQL

sql-- sql/init.sql

CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description TEXT,
  price DECIMAL(10, 2) NOT NULL,
  category VARCHAR(100),
  in_stock BOOLEAN DEFAULT true,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

-- Index pour la synchronisation incrementale
CREATE INDEX idx_products_updated_at ON products(updated_at);

-- Donnees de test
INSERT INTO products (name, description, price, category) VALUES
  ('MacBook Pro 16"', 'Laptop Apple M3 Pro, 18 Go RAM, 512 Go SSD', 2799.00, 'laptops'),
  ('ThinkPad X1 Carbon', 'Laptop Lenovo, Intel i7, 16 Go RAM', 1649.00, 'laptops'),
  ('Samsung Galaxy S24', 'Smartphone Samsung, 256 Go, 5G', 899.00, 'smartphones'),
  ('iPhone 15 Pro', 'Smartphone Apple A17 Pro, 256 Go', 1229.00, 'smartphones'),
  ('Sony WH-1000XM5', 'Casque audio sans fil, noise cancelling', 349.00, 'audio'),
  ('AirPods Pro 2', 'Ecouteurs Apple, USB-C, noise cancelling', 279.00, 'audio'),
  ('LG UltraWide 34"', 'Moniteur 34 pouces, 3440x1440, USB-C', 599.00, 'monitors'),
  ('Dell U2723QE', 'Moniteur 27 pouces, 4K, USB-C hub', 479.00, 'monitors'),
  ('Keychron K8 Pro', 'Clavier mecanique, Gateron Brown, sans fil', 109.00, 'peripherals'),
  ('Logitech MX Master 3S', 'Souris ergonomique, multi-device', 99.00, 'peripherals');

Le compose.yaml

yamlservices:
  postgres:
    image: postgres:16
    container_name: postgres
    environment:
      - POSTGRES_DB=shop
      - POSTGRES_USER=app
      - POSTGRES_PASSWORD=secret
    ports:
      - "5432:5432"
    volumes:
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
      - pg-data:/var/lib/postgresql/data

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10

  logstash:
    image: docker.elastic.co/logstash/logstash:8.17.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms256m -Xmx256m"
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./logstash/drivers/:/usr/share/logstash/drivers/:ro
    depends_on:
      elasticsearch:
        condition: service_healthy
      postgres:
        condition: service_started

  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  es-data:
  pg-data:

Le pipeline JDBC

# logstash/pipeline/jdbc.conf

input {
  jdbc {
    # Connexion PostgreSQL
    jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
    jdbc_user => "app"
    jdbc_password => "secret"

    # Requete SQL avec synchronisation incrementale
    statement => "SELECT * FROM products WHERE updated_at > :sql_last_value ORDER BY updated_at ASC"

    # Planification : toutes les 30 secondes
    schedule => "*/30 * * * * *"

    # Tracking de la position
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"

    # Ou stocker la derniere valeur lue
    last_run_metadata_path => "/usr/share/logstash/data/.logstash_jdbc_last_run"

    # Identifiant unique pour l'upsert
    jdbc_paging_enabled => true
    jdbc_page_size => 1000
  }
}

filter {
  # Utiliser l'ID PostgreSQL comme document_id pour l'upsert
  mutate {
    add_field => { "[@metadata][document_id]" => "%{id}" }
  }

  # Convertir le prix en float (JDBC le renvoie en BigDecimal)
  mutate {
    convert => { "price" => "float" }
  }

  # Supprimer les champs techniques JDBC
  mutate {
    remove_field => ["@version"]
  }
}

output {
  stdout { codec => rubydebug }

  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "products"
    document_id => "%{[@metadata][document_id]}"
    action => "index"
  }
}

Comment la synchronisation incrementale fonctionne

Le mecanisme repose sur deux choses :

  1. sql_last_value : une variable qui contient la dernière valeur de tracking_column lue. Au premier run, elle vaut 1970-01-01 00:00:00.

  2. last_run_metadata_path : un fichier ou Logstash sauvegarde sql_last_value entre les redemarrages.

Le cycle :

Premier run:
  sql_last_value = 1970-01-01 00:00:00
  SELECT * FROM products WHERE updated_at > '1970-01-01'
  → renvoie les 10 produits
  sql_last_value = 2026-03-31 14:00:00 (max updated_at)

Runs suivants (toutes les 30s):
  sql_last_value = 2026-03-31 14:00:00
  SELECT * FROM products WHERE updated_at > '2026-03-31 14:00:00'
  → renvoie 0 produits (rien n'a change)

Quand un produit est modifie:
  UPDATE products SET price = 2599.00, updated_at = NOW() WHERE id = 1;
  → au prochain run, Logstash detecte le changement et re-indexe le produit

C'est simple et efficace. Le seul prerequis : ta table doit avoir une colonne updated_at qui est mise à jour a chaque modification.

Tester la synchronisation

Lance le lab, attends que les 10 produits soient indexes, puis modifie un produit :

bashdocker exec postgres psql -U app -d shop -c \
  "UPDATE products SET price = 2599.00, updated_at = NOW() WHERE id = 1;"

Dans les 30 secondes, Logstash détecté le changement et re-indexé le produit avec le nouveau prix. Verifie dans Elasticsearch :

bashcurl -s http://localhost:9200/products/_doc/1 | python3 -m json.tool

La syntaxe du schedule

Le schedule utilise la syntaxe cron avec une extension pour les secondes :

# secondes minutes heures jour_mois mois jour_semaine

schedule => "*/30 * * * * *"     # toutes les 30 secondes
schedule => "0 */5 * * * *"      # toutes les 5 minutes
schedule => "0 0 * * * *"        # toutes les heures
schedule => "0 0 2 * * *"        # tous les jours a 2h du matin

Pour du dev, 30 secondes c'est bien. En production, toutes les minutes ou toutes les 5 minutes suffisent pour la plupart des cas d'usage.

Kafka vs JDBC : quand utiliser quoi

Kafka JDBC
Type de donnees Événements, logs, messages Tables de base de donnees
Temps réel Oui (millisecondes) Non (polling, delai = schedule)
Volume Illimite (streaming) Limite par la requête SQL
Infra requise Cluster Kafka Juste la base de donnees
Doublon Non (offsets Kafka) Possible sans upsert
Suppression Non geree nativement Detecte via soft delete

Sur paltemps.fr, Kafka sert pour les logs et les événements temps réel. JDBC sert pour synchroniser les catalogues produits et les donnees de référencé vers Elasticsearch. Deux use cases différents, deux inputs différents.

Résumé

  • L'input Kafka lit des messages depuis un ou plusieurs topics avec gestion des offsets par consumer group
  • auto_offset_reset => "earliest" lit depuis le début, "latest" ne lit que les nouveaux messages
  • L'input JDBC exécuté une requête SQL a intervalles réguliers (schedule cron)
  • sql_last_value permet la synchronisation incrementale (ne relire que les modifications)
  • Kafka est pour le streaming temps réel, JDBC est pour le polling de base de donnees
  • Le driver JDBC (fichier .jar) doit etre telecharge et monte dans le conteneur

Precedent : 06 - Inputs HTTP, TCP, UDP | Suivant : 08 - Les codecs

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.