28 - Cas pratique : ETL avec Logstash et PostgreSQL
Ce que tu vas apprendre
- Monter un projet ETL complet : PostgreSQL -> Logstash -> Elasticsearch
- Configurer la synchronisation incrementale (ne re-indexer que les changements)
- Gerer la synchronisation full (re-indexation complète)
- Faire de l'upsert dans Elasticsearch (créer ou mettre à jour)
- Gerer les suppressions (soft delete)
Prerequisites
- Avoir compris l'input JDBC (voir article 07)
- Avoir compris l'output Elasticsearch (voir article 17)
Le use case
Tu as une base PostgreSQL avec un catalogue de produits. Tu veux que ces produits soient cherchables avec la recherche full-text d'Elasticsearch. Les utilisateurs tapent "casque bluetooth noise cancelling" et trouvent les bons produits.
PostgreSQL est la source de vérité. Elasticsearch est le moteur de recherche. Logstash synchronise les deux.
┌──────────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐
│ PostgreSQL │ │ Logstash │ │ Elasticsearch│ │ API │
│ │────>│ │────>│ │<────│ Search │
│ products │ │ JDBC │ │ products │ │ │
│ categories │ │ polling │ │ (full-text) │ │ /search │
│ │ │ 30s │ │ │ │ │
└──────────────┘ └──────────┘ └──────────────┘ └──────────┘
^ │
│ │
└─── Admin ecrit ici Utilisateurs lisent ici┘
Structure du projet
etl-lab/
├── compose.yaml
├── sql/
│ ├── 01-schema.sql
│ └── 02-seed.sql
├── logstash/
│ ├── config/
│ │ ├── logstash.yml
│ │ └── pipelines.yml
│ ├── pipeline/
│ │ ├── products-incremental.conf
│ │ └── products-full.conf
│ ├── drivers/
│ │ └── postgresql-42.7.3.jar
│ └── templates/
│ └── products.json
└── scripts/
├── test-sync.sh
└── trigger-full-sync.sh
Le schema SQL
sql-- sql/01-schema.sql
CREATE TABLE categories (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL
);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL,
category_id INTEGER REFERENCES categories(id),
brand VARCHAR(100),
sku VARCHAR(50) UNIQUE,
in_stock BOOLEAN DEFAULT true,
rating DECIMAL(2, 1) DEFAULT 0,
review_count INTEGER DEFAULT 0,
tags TEXT[],
is_deleted BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_products_updated_at ON products(updated_at);
CREATE INDEX idx_products_is_deleted ON products(is_deleted);
-- Trigger pour mettre a jour updated_at automatiquement
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$ LANGUAGE plpgsql;
CREATE TRIGGER products_updated_at
BEFORE UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION update_timestamp();
Le trigger update_timestamp() met automatiquement updated_at à jour a chaque modification. C'est ce champ que Logstash utilise pour détecter les changements.
sql-- sql/02-seed.sql
INSERT INTO categories (name, slug) VALUES
('Laptops', 'laptops'),
('Smartphones', 'smartphones'),
('Audio', 'audio'),
('Moniteurs', 'monitors'),
('Peripheriques', 'peripherals');
INSERT INTO products (name, description, price, category_id, brand, sku, in_stock, rating, review_count, tags) VALUES
('MacBook Pro 16"', 'Laptop Apple M3 Pro, 18 Go RAM, 512 Go SSD. Ecran Liquid Retina XDR.', 2799.00, 1, 'Apple', 'MBP16-M3-512', true, 4.8, 342, ARRAY['laptop', 'apple', 'pro', 'retina']),
('ThinkPad X1 Carbon Gen 11', 'Ultrabook Lenovo, Intel i7-1365U, 16 Go RAM, 512 Go SSD.', 1649.00, 1, 'Lenovo', 'TP-X1C-G11', true, 4.5, 189, ARRAY['laptop', 'lenovo', 'ultrabook', 'business']),
('Galaxy S24 Ultra', 'Smartphone Samsung, Snapdragon 8 Gen 3, 256 Go, 5G, S Pen.', 1299.00, 2, 'Samsung', 'GS24U-256', true, 4.6, 567, ARRAY['smartphone', 'samsung', '5g', 's-pen']),
('iPhone 15 Pro', 'Smartphone Apple A17 Pro, 256 Go, USB-C, titane.', 1229.00, 2, 'Apple', 'IP15P-256', true, 4.7, 891, ARRAY['smartphone', 'apple', 'usb-c', 'titane']),
('Sony WH-1000XM5', 'Casque audio sans fil, noise cancelling adaptatif, 30h autonomie.', 349.00, 3, 'Sony', 'WH1000XM5', true, 4.7, 1203, ARRAY['casque', 'bluetooth', 'noise-cancelling', 'sony']),
('AirPods Pro 2', 'Ecouteurs Apple, USB-C, noise cancelling adaptatif, audio spatial.', 279.00, 3, 'Apple', 'APP2-USBC', true, 4.6, 2341, ARRAY['ecouteurs', 'apple', 'bluetooth', 'noise-cancelling']),
('Dell U2723QE', 'Moniteur 27 pouces, 4K UHD, USB-C hub 90W, IPS Black.', 479.00, 4, 'Dell', 'U2723QE', true, 4.4, 156, ARRAY['moniteur', '4k', 'usb-c', 'dell']),
('LG UltraWide 34WP85C', 'Moniteur 34 pouces, 3440x1440, courbe, USB-C 90W.', 599.00, 4, 'LG', '34WP85C', false, 4.3, 98, ARRAY['moniteur', 'ultrawide', 'courbe', 'lg']),
('Keychron K8 Pro', 'Clavier mecanique, Gateron Brown, sans fil Bluetooth, retroeclaire.', 109.00, 5, 'Keychron', 'K8P-BROWN', true, 4.5, 423, ARRAY['clavier', 'mecanique', 'bluetooth', 'keychron']),
('Logitech MX Master 3S', 'Souris ergonomique, multi-device, USB-C, silencieuse.', 99.00, 5, 'Logitech', 'MXM3S', true, 4.6, 1567, ARRAY['souris', 'ergonomique', 'bluetooth', 'logitech']);
Le pipeline de synchronisation incrementale
# logstash/pipeline/products-incremental.conf
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
jdbc_user => "app"
jdbc_password => "${PG_PASSWORD}"
# Requete avec JOIN pour inclure la categorie
statement => "
SELECT
p.id,
p.name,
p.description,
p.price,
p.brand,
p.sku,
p.in_stock,
p.rating,
p.review_count,
p.tags,
p.is_deleted,
p.created_at,
p.updated_at,
c.name AS category_name,
c.slug AS category_slug
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
WHERE p.updated_at > :sql_last_value
ORDER BY p.updated_at ASC
"
schedule => "*/30 * * * * *"
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/data/.products_last_run"
jdbc_paging_enabled => true
jdbc_page_size => 500
}
}
filter {
# Stocker l'ID pour l'upsert
mutate {
add_field => { "[@metadata][document_id]" => "%{id}" }
}
# Convertir les types
mutate {
convert => {
"price" => "float"
"rating" => "float"
"review_count" => "integer"
}
}
# Structurer la categorie dans un objet
mutate {
rename => {
"category_name" => "[category][name]"
"category_slug" => "[category][slug]"
}
}
# Convertir le tableau PostgreSQL en tableau JSON
if [tags] {
ruby {
code => '
tags_str = event.get("tags")
if tags_str.is_a?(String) && tags_str.start_with?("{")
tags_array = tags_str.gsub(/[{}]/, "").split(",")
event.set("tags", tags_array)
end
'
}
}
# Gerer les soft deletes
if [is_deleted] == true {
mutate {
add_field => { "[@metadata][action]" => "delete" }
}
} else {
mutate {
add_field => { "[@metadata][action]" => "index" }
}
}
# Nettoyer les champs internes
mutate {
remove_field => ["@version", "@timestamp", "is_deleted"]
}
}
output {
if [@metadata][action] == "delete" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{[@metadata][document_id]}"
action => "delete"
}
} else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{[@metadata][document_id]}"
action => "index"
}
}
}
Gestion des suppressions
PostgreSQL utilise un soft delete (is_deleted = true). Quand un produit est "supprime", le trigger met updated_at à jour, Logstash détecté le changement, et le filtre envoie un action => "delete" a Elasticsearch.
C'est le seul pattern fiable. Un vrai DELETE FROM products ne peut pas etre détecté par le JDBC input (la ligne n'existe plus dans le SELECT).
Le compose.yaml
yamlservices:
postgres:
image: postgres:16
container_name: postgres
environment:
- POSTGRES_DB=shop
- POSTGRES_USER=app
- POSTGRES_PASSWORD=secret
ports:
- "5432:5432"
volumes:
- ./sql/:/docker-entrypoint-initdb.d/:ro
- pg-data:/var/lib/postgresql/data
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
healthcheck:
test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
interval: 10s
timeout: 5s
retries: 10
logstash:
image: docker.elastic.co/logstash/logstash:8.17.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xms256m -Xmx256m"
- PG_PASSWORD=secret
volumes:
- ./logstash/pipeline/:/usr/share/logstash/pipeline/
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
- ./logstash/drivers/:/usr/share/logstash/drivers/:ro
- ./logstash/templates/:/usr/share/logstash/templates/:ro
- logstash-data:/usr/share/logstash/data
depends_on:
elasticsearch:
condition: service_healthy
postgres:
condition: service_started
kibana:
image: docker.elastic.co/kibana/kibana:8.17.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
volumes:
es-data:
pg-data:
logstash-data:
Tester la synchronisation
bash# Demarrer
docker compose up -d
# Attendre 30 secondes (premier polling JDBC)
# Verifier que les 10 produits sont indexes
curl -s "http://localhost:9200/products/_count" | python3 -m json.tool
# Chercher un produit
curl -s "http://localhost:9200/products/_search?q=casque+bluetooth" | python3 -m json.tool
# Modifier un produit dans PostgreSQL
docker exec postgres psql -U app -d shop -c \
"UPDATE products SET price = 2599.00 WHERE sku = 'MBP16-M3-512';"
# Attendre 30 secondes, verifier la mise a jour
curl -s "http://localhost:9200/products/_doc/1" | python3 -c "
import json, sys
doc = json.load(sys.stdin)
print(f'Price: {doc[\"_source\"][\"price\"]}')"
# Soft-delete un produit
docker exec postgres psql -U app -d shop -c \
"UPDATE products SET is_deleted = true WHERE sku = 'K8P-BROWN';"
# Attendre 30 secondes, verifier la suppression
curl -s "http://localhost:9200/products/_count" | python3 -m json.tool
# count devrait etre 9
Sur paltemps.fr, ce pattern sert pour synchroniser les catalogues de produits et les annuaires. La base relationnelle gere les transactions et la coherence, Elasticsearch gere la recherche. Chacun fait ce qu'il fait de mieux.
Résumé
- Logstash + JDBC transforme Logstash en outil ETL pour synchroniser SQL -> Elasticsearch
- La synchronisation incrementale utilise
sql_last_valueet une colonneupdated_at - Le trigger PostgreSQL
update_timestamp()metupdated_atà jour automatiquement - L'upsert se fait avec
document_idetaction => "index" - Les suppressions passent par un soft delete (
is_deleted) pour etre detectees par le polling - Le schedule
*/30 * * * * *polle toutes les 30 secondes (ajuste selon le besoin)
Precedent : 27 - Cas pratique : logs applicatifs | Suivant : 29 - Cas pratique : enrichissement