07 - Inputs Kafka et JDBC : sources avancees
Ce que tu vas apprendre
- Lire des messages depuis un topic Kafka
- Comprendre les consumer groups et la gestion des offsets
- Configurer l'input JDBC pour poller une base PostgreSQL
- Faire de la synchronisation incrementale avec
sql_last_value - Monter un lab Docker complet pour chaque scénario
Prerequisites
- Le lab Docker de base (voir article 02)
- Des notions de Kafka (topics, partitions, consumer groups) aident mais ne sont pas obligatoires
- Des notions de SQL (SELECT, WHERE) suffisent pour la partie JDBC
Kafka : pourquoi c'est le combo parfait avec Logstash
Quand j'ai découvert Kafka + Logstash, j'ai mis un moment a comprendre pourquoi les deux ensemble. Filebeat envoie directement a Logstash, pourquoi ajouter Kafka au milieu ?
La réponse tient en un mot : découplage. Si Logstash tombe, Filebeat bufferise en local, mais sa capacité est limitee. Avec Kafka entre les deux, tu as un buffer distribue et durable. Les messages restent dans Kafka des jours, des semaines si tu veux. Logstash peut tomber, remonter, et reprendre ou il en etait. Zero perte, meme pour des interruptions longues.
L'autre avantage : tu peux avoir plusieurs consumers. Logstash lit les logs pour Elasticsearch, un autre service les lit pour de l'alerting, un troisieme pour de l'archivage. Chaque consumer avance a son rythme.
┌──────────┐ ┌───────────────────────────┐ ┌──────────┐ ┌──────────────┐
│ Filebeat │────>│ Kafka │────>│ Logstash │────>│ Elasticsearch│
│ Filebeat │────>│ │ │ │ │ │
│ Filebeat │────>│ topic: logs-app │ │ consumer │ │ │
└──────────┘ │ partitions: 3 │ │ group: │ └──────────────┘
│ retention: 7 jours │ │ logstash │
│ │ └──────────┘
│ │
│ │────>┌──────────┐
│ │ │ Alerting │
└───────────────────────────┘ └──────────┘
Lab Kafka + Logstash
On va monter un lab avec Kafka en mode KRaft (sans Zookeeper, plus simple depuis Kafka 3.3).
Structure du projet :
kafka-lab/
├── compose.yaml
├── logstash/
│ ├── config/
│ │ ├── logstash.yml
│ │ └── pipelines.yml
│ └── pipeline/
│ └── kafka.conf
└── scripts/
└── produce-messages.sh
Le compose.yaml :
yamlservices:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
ports:
- "9092:9092"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
healthcheck:
test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
interval: 10s
timeout: 5s
retries: 10
logstash:
image: docker.elastic.co/logstash/logstash:8.17.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xms256m -Xmx256m"
volumes:
- ./logstash/pipeline/:/usr/share/logstash/pipeline/
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
depends_on:
elasticsearch:
condition: service_healthy
kafka:
condition: service_started
kibana:
image: docker.elastic.co/kibana/kibana:8.17.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
volumes:
es-data:
Le pipeline Kafka
# logstash/pipeline/kafka.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["logs-app"]
group_id => "logstash-indexer"
codec => json
consumer_threads => 2
auto_offset_reset => "earliest"
decorate_events => "basic"
}
}
filter {
# Convertir les types si necessaire
if [duration_ms] {
mutate {
convert => { "duration_ms" => "integer" }
}
}
if [status_code] {
mutate {
convert => { "status_code" => "integer" }
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-kafka-%{+YYYY.MM.dd}"
}
}
Les paramètres de l'input Kafka :
| Paramètre | Description |
|---|---|
bootstrap_servers |
Adresse du broker Kafka |
topics |
Liste des topics a consommer (tableau) |
group_id |
Identifiant du consumer group. Plusieurs Logstash avec le meme group_id se partagent les partitions |
codec |
Format des messages. json si les producteurs envoient du JSON |
consumer_threads |
Nombre de threads consommateurs. Idealement = nombre de partitions du topic |
auto_offset_reset |
earliest : lit depuis le début du topic. latest : ne lit que les nouveaux messages |
decorate_events |
Ajoute des metadonnees Kafka (topic, partition, offset) a l'événement |
Produire des messages de test
bash#!/bin/bash
# scripts/produce-messages.sh
SERVICES=("api-users" "api-orders" "api-payments")
LEVELS=("INFO" "WARN" "ERROR")
for i in $(seq 1 20); do
SERVICE=${SERVICES[$RANDOM % ${#SERVICES[@]}]}
LEVEL=${LEVELS[$RANDOM % ${#LEVELS[@]}]}
DURATION=$((RANDOM % 5000))
STATUS=$((200 + (RANDOM % 4) * 100))
TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
MSG="{\"@timestamp\":\"${TS}\",\"level\":\"${LEVEL}\",\"service\":\"${SERVICE}\",\"status_code\":${STATUS},\"duration_ms\":${DURATION}}"
docker exec kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic logs-app <<< "$MSG"
sleep 0.5
done
echo "20 messages produits dans le topic logs-app"
bashchmod +x scripts/produce-messages.sh
./scripts/produce-messages.sh
Consumer groups et offsets
Le group_id est fondamental. Kafka retient la position de lecture (l'offset) par consumer group. Si Logstash redemarrage avec le meme group_id, il reprend là où il s'etait arrêté.
Si tu lances deux instances Logstash avec le meme group_id, Kafka repartit les partitions entre elles. Chaque instance traite une partie des messages. C'est du load balancing automatique.
Si tu changes le group_id, le nouveau consumer recommence depuis le début (ou la fin, selon auto_offset_reset).
Les metadonnees Kafka
Avec decorate_events => "basic", chaque événement contient les metadonnees Kafka :
json{
"@metadata": {
"kafka": {
"topic": "logs-app",
"partition": 1,
"offset": 42,
"consumer_group": "logstash-indexer",
"timestamp": 1711889400000
}
}
}
Ces metadonnees sont dans @metadata, donc invisibles dans l'output. Mais tu peux les utiliser dans les filtres et conditionnels :
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
JDBC : lire dans une base de donnees
L'input JDBC transforme Logstash en outil ETL. Tu ecris une requête SQL, Logstash l'exécuté a intervalles réguliers et envoie les résultats ligne par ligne dans le pipeline.
┌──────────────┐ ┌──────────┐ ┌──────────────┐
│ PostgreSQL │ │ Logstash │ │ Elasticsearch│
│ │ │ │ │ │
│ SELECT * │────>│ JDBC │────>│ index: │
│ FROM products│ │ input │ │ products │
│ WHERE │ │ │ │ │
│ updated_at > │ │ toutes │ │ full-text │
│ :sql_last │ │ les 30s │ │ search │
│ _value │ │ │ │ │
└──────────────┘ └──────────┘ └──────────────┘
Lab JDBC + PostgreSQL
Structure du projet :
jdbc-lab/
├── compose.yaml
├── logstash/
│ ├── config/
│ │ ├── logstash.yml
│ │ └── pipelines.yml
│ ├── pipeline/
│ │ └── jdbc.conf
│ └── drivers/
│ └── postgresql-42.7.3.jar
└── sql/
└── init.sql
Le driver JDBC PostgreSQL est un fichier .jar qu'il faut telecharger. Tu le trouves sur jdbc.postgresql.org. Place-le dans logstash/drivers/.
bash# Telecharger le driver
curl -L -o logstash/drivers/postgresql-42.7.3.jar \
https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
Schema SQL
sql-- sql/init.sql
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL,
category VARCHAR(100),
in_stock BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Index pour la synchronisation incrementale
CREATE INDEX idx_products_updated_at ON products(updated_at);
-- Donnees de test
INSERT INTO products (name, description, price, category) VALUES
('MacBook Pro 16"', 'Laptop Apple M3 Pro, 18 Go RAM, 512 Go SSD', 2799.00, 'laptops'),
('ThinkPad X1 Carbon', 'Laptop Lenovo, Intel i7, 16 Go RAM', 1649.00, 'laptops'),
('Samsung Galaxy S24', 'Smartphone Samsung, 256 Go, 5G', 899.00, 'smartphones'),
('iPhone 15 Pro', 'Smartphone Apple A17 Pro, 256 Go', 1229.00, 'smartphones'),
('Sony WH-1000XM5', 'Casque audio sans fil, noise cancelling', 349.00, 'audio'),
('AirPods Pro 2', 'Ecouteurs Apple, USB-C, noise cancelling', 279.00, 'audio'),
('LG UltraWide 34"', 'Moniteur 34 pouces, 3440x1440, USB-C', 599.00, 'monitors'),
('Dell U2723QE', 'Moniteur 27 pouces, 4K, USB-C hub', 479.00, 'monitors'),
('Keychron K8 Pro', 'Clavier mecanique, Gateron Brown, sans fil', 109.00, 'peripherals'),
('Logitech MX Master 3S', 'Souris ergonomique, multi-device', 99.00, 'peripherals');
Le compose.yaml
yamlservices:
postgres:
image: postgres:16
container_name: postgres
environment:
- POSTGRES_DB=shop
- POSTGRES_USER=app
- POSTGRES_PASSWORD=secret
ports:
- "5432:5432"
volumes:
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
- pg-data:/var/lib/postgresql/data
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
healthcheck:
test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
interval: 10s
timeout: 5s
retries: 10
logstash:
image: docker.elastic.co/logstash/logstash:8.17.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xms256m -Xmx256m"
volumes:
- ./logstash/pipeline/:/usr/share/logstash/pipeline/
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
- ./logstash/drivers/:/usr/share/logstash/drivers/:ro
depends_on:
elasticsearch:
condition: service_healthy
postgres:
condition: service_started
kibana:
image: docker.elastic.co/kibana/kibana:8.17.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
volumes:
es-data:
pg-data:
Le pipeline JDBC
# logstash/pipeline/jdbc.conf
input {
jdbc {
# Connexion PostgreSQL
jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
jdbc_user => "app"
jdbc_password => "secret"
# Requete SQL avec synchronisation incrementale
statement => "SELECT * FROM products WHERE updated_at > :sql_last_value ORDER BY updated_at ASC"
# Planification : toutes les 30 secondes
schedule => "*/30 * * * * *"
# Tracking de la position
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
# Ou stocker la derniere valeur lue
last_run_metadata_path => "/usr/share/logstash/data/.logstash_jdbc_last_run"
# Identifiant unique pour l'upsert
jdbc_paging_enabled => true
jdbc_page_size => 1000
}
}
filter {
# Utiliser l'ID PostgreSQL comme document_id pour l'upsert
mutate {
add_field => { "[@metadata][document_id]" => "%{id}" }
}
# Convertir le prix en float (JDBC le renvoie en BigDecimal)
mutate {
convert => { "price" => "float" }
}
# Supprimer les champs techniques JDBC
mutate {
remove_field => ["@version"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{[@metadata][document_id]}"
action => "index"
}
}
Comment la synchronisation incrementale fonctionne
Le mecanisme repose sur deux choses :
sql_last_value: une variable qui contient la dernière valeur detracking_columnlue. Au premier run, elle vaut1970-01-01 00:00:00.last_run_metadata_path: un fichier ou Logstash sauvegardesql_last_valueentre les redemarrages.
Le cycle :
Premier run:
sql_last_value = 1970-01-01 00:00:00
SELECT * FROM products WHERE updated_at > '1970-01-01'
→ renvoie les 10 produits
sql_last_value = 2026-03-31 14:00:00 (max updated_at)
Runs suivants (toutes les 30s):
sql_last_value = 2026-03-31 14:00:00
SELECT * FROM products WHERE updated_at > '2026-03-31 14:00:00'
→ renvoie 0 produits (rien n'a change)
Quand un produit est modifie:
UPDATE products SET price = 2599.00, updated_at = NOW() WHERE id = 1;
→ au prochain run, Logstash detecte le changement et re-indexe le produit
C'est simple et efficace. Le seul prerequis : ta table doit avoir une colonne updated_at qui est mise à jour a chaque modification.
Tester la synchronisation
Lance le lab, attends que les 10 produits soient indexes, puis modifie un produit :
bashdocker exec postgres psql -U app -d shop -c \
"UPDATE products SET price = 2599.00, updated_at = NOW() WHERE id = 1;"
Dans les 30 secondes, Logstash détecté le changement et re-indexé le produit avec le nouveau prix. Verifie dans Elasticsearch :
bashcurl -s http://localhost:9200/products/_doc/1 | python3 -m json.tool
La syntaxe du schedule
Le schedule utilise la syntaxe cron avec une extension pour les secondes :
# secondes minutes heures jour_mois mois jour_semaine
schedule => "*/30 * * * * *" # toutes les 30 secondes
schedule => "0 */5 * * * *" # toutes les 5 minutes
schedule => "0 0 * * * *" # toutes les heures
schedule => "0 0 2 * * *" # tous les jours a 2h du matin
Pour du dev, 30 secondes c'est bien. En production, toutes les minutes ou toutes les 5 minutes suffisent pour la plupart des cas d'usage.
Kafka vs JDBC : quand utiliser quoi
| Kafka | JDBC | |
|---|---|---|
| Type de donnees | Événements, logs, messages | Tables de base de donnees |
| Temps réel | Oui (millisecondes) | Non (polling, delai = schedule) |
| Volume | Illimite (streaming) | Limite par la requête SQL |
| Infra requise | Cluster Kafka | Juste la base de donnees |
| Doublon | Non (offsets Kafka) | Possible sans upsert |
| Suppression | Non geree nativement | Detecte via soft delete |
Sur paltemps.fr, Kafka sert pour les logs et les événements temps réel. JDBC sert pour synchroniser les catalogues produits et les donnees de référencé vers Elasticsearch. Deux use cases différents, deux inputs différents.
Résumé
- L'input Kafka lit des messages depuis un ou plusieurs topics avec gestion des offsets par consumer group
auto_offset_reset => "earliest"lit depuis le début,"latest"ne lit que les nouveaux messages- L'input JDBC exécuté une requête SQL a intervalles réguliers (schedule cron)
sql_last_valuepermet la synchronisation incrementale (ne relire que les modifications)- Kafka est pour le streaming temps réel, JDBC est pour le polling de base de donnees
- Le driver JDBC (fichier .jar) doit etre telecharge et monte dans le conteneur
Precedent : 06 - Inputs HTTP, TCP, UDP | Suivant : 08 - Les codecs