Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL

Utiliser Logstash comme outil ETL pour synchroniser des donnees PostgreSQL vers Elasticsearch. Sync incrementale et full.

  1. 01 Logstash pour les devs - 00 - Pourquoi Logstash existe encore en 2026
  2. 02 Logstash pour les devs - 01 - L'Elastic Stack de A a Z
  3. 03 Logstash pour les devs - 02 - Installer Logstash avec Docker en 5 minutes
  4. 04 Logstash pour les devs - 03 - Anatomie d'un pipeline Logstash
  5. 05 Logstash pour les devs - 04 - Inputs stdin et file : lire des donnees locales
  6. 06 Logstash pour les devs - 05 - Input Beats : recevoir des logs de Filebeat
  7. 07 Logstash pour les devs - 06 - Inputs HTTP, TCP et UDP : recevoir des donnees réseau
  8. 08 Logstash pour les devs - 07 - Inputs Kafka et JDBC : sources avancees
  9. 09 Logstash pour les devs - 08 - Les codecs : decoder et encoder les donnees
  10. 10 Logstash pour les devs - 09 - Le filtre Grok : parser n'importe quel log
  11. 11 Logstash pour les devs - 10 - Le filtre Dissect : parser sans regex
  12. 12 Logstash pour les devs - 11 - Le filtre Mutate : transformer les champs
  13. 13 Logstash pour les devs - 12 - Filtres Date et GeoIP : temps et geolocalisation
  14. 14 Logstash pour les devs - 13 - Filtres KV, JSON et XML : parser les formats structures
  15. 15 Logstash pour les devs - 14 - Le filtre Ruby : quand les autres ne suffisent pas
  16. 16 Logstash pour les devs - 15 - Filtres Aggregate et Metrics : correler les événements
  17. 17 Logstash pour les devs - 16 - Conditionnels et contrôle de flux
  18. 18 Logstash pour les devs - 17 - Output Elasticsearch : envoyer les donnees
  19. 19 Logstash pour les devs - 18 - Outputs file, stdout et les autres
  20. 20 Logstash pour les devs - 19 - Gerer le multiline : stack traces et logs multi-lignes
  21. 21 Logstash pour les devs - 20 - Pipelines multiples et pipeline-to-pipeline
  22. 22 Logstash pour les devs - 21 - Performance et tuning Logstash
  23. 23 Logstash pour les devs - 22 - Monitoring Logstash : metriques et alertes
  24. 24 Logstash pour les devs - 23 - Dead Letter Queue : ne plus perdre d'événements
  25. 25 Logstash pour les devs - 24 - Sécurité Logstash : SSL, auth et secrets
  26. 26 Logstash pour les devs - 25 - Debugger un pipeline Logstash
  27. 27 Logstash pour les devs - 26 - Tester ses pipelines avant la prod
  28. 28 Logstash pour les devs - 27 - Cas pratique : centraliser des logs applicatifs
  29. 29 Logstash pour les devs - 28 - Cas pratique : ETL avec Logstash et PostgreSQL
  30. 30 Logstash pour les devs - 29 - Cas pratique : enrichir des donnees en temps réel
  31. 31 Logstash pour les devs - 30 - Logstash en production : architecture et bonnes pratiques
  32. 32 Logstash pour les devs - 31 - Glossaire Logstash de A a Z

28 - Cas pratique : ETL avec Logstash et PostgreSQL

Ce que tu vas apprendre

  • Monter un projet ETL complet : PostgreSQL -> Logstash -> Elasticsearch
  • Configurer la synchronisation incrementale (ne re-indexer que les changements)
  • Gerer la synchronisation full (re-indexation complète)
  • Faire de l'upsert dans Elasticsearch (créer ou mettre à jour)
  • Gerer les suppressions (soft delete)

Prerequisites

  • Avoir compris l'input JDBC (voir article 07)
  • Avoir compris l'output Elasticsearch (voir article 17)

Le use case

Tu as une base PostgreSQL avec un catalogue de produits. Tu veux que ces produits soient cherchables avec la recherche full-text d'Elasticsearch. Les utilisateurs tapent "casque bluetooth noise cancelling" et trouvent les bons produits.

PostgreSQL est la source de vérité. Elasticsearch est le moteur de recherche. Logstash synchronise les deux.

┌──────────────┐     ┌──────────┐     ┌──────────────┐     ┌──────────┐
│ PostgreSQL   │     │ Logstash │     │ Elasticsearch│     │  API     │
│              │────>│          │────>│              │<────│  Search  │
│ products     │     │ JDBC     │     │ products     │     │          │
│ categories   │     │ polling  │     │ (full-text)  │     │ /search  │
│              │     │ 30s      │     │              │     │          │
└──────────────┘     └──────────┘     └──────────────┘     └──────────┘
       ^                                                        │
       │                                                        │
       └─── Admin ecrit ici              Utilisateurs lisent ici┘

Structure du projet

etl-lab/
├── compose.yaml
├── sql/
│   ├── 01-schema.sql
│   └── 02-seed.sql
├── logstash/
│   ├── config/
│   │   ├── logstash.yml
│   │   └── pipelines.yml
│   ├── pipeline/
│   │   ├── products-incremental.conf
│   │   └── products-full.conf
│   ├── drivers/
│   │   └── postgresql-42.7.3.jar
│   └── templates/
│       └── products.json
└── scripts/
    ├── test-sync.sh
    └── trigger-full-sync.sh

Le schema SQL

sql-- sql/01-schema.sql

CREATE TABLE categories (
  id SERIAL PRIMARY KEY,
  name VARCHAR(100) NOT NULL,
  slug VARCHAR(100) UNIQUE NOT NULL
);

CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description TEXT,
  price DECIMAL(10, 2) NOT NULL,
  category_id INTEGER REFERENCES categories(id),
  brand VARCHAR(100),
  sku VARCHAR(50) UNIQUE,
  in_stock BOOLEAN DEFAULT true,
  rating DECIMAL(2, 1) DEFAULT 0,
  review_count INTEGER DEFAULT 0,
  tags TEXT[],
  is_deleted BOOLEAN DEFAULT false,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_products_updated_at ON products(updated_at);
CREATE INDEX idx_products_is_deleted ON products(is_deleted);

-- Trigger pour mettre a jour updated_at automatiquement
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $
BEGIN
  NEW.updated_at = NOW();
  RETURN NEW;
END;
$ LANGUAGE plpgsql;

CREATE TRIGGER products_updated_at
  BEFORE UPDATE ON products
  FOR EACH ROW EXECUTE FUNCTION update_timestamp();

Le trigger update_timestamp() met automatiquement updated_at à jour a chaque modification. C'est ce champ que Logstash utilise pour détecter les changements.

sql-- sql/02-seed.sql

INSERT INTO categories (name, slug) VALUES
  ('Laptops', 'laptops'),
  ('Smartphones', 'smartphones'),
  ('Audio', 'audio'),
  ('Moniteurs', 'monitors'),
  ('Peripheriques', 'peripherals');

INSERT INTO products (name, description, price, category_id, brand, sku, in_stock, rating, review_count, tags) VALUES
  ('MacBook Pro 16"', 'Laptop Apple M3 Pro, 18 Go RAM, 512 Go SSD. Ecran Liquid Retina XDR.', 2799.00, 1, 'Apple', 'MBP16-M3-512', true, 4.8, 342, ARRAY['laptop', 'apple', 'pro', 'retina']),
  ('ThinkPad X1 Carbon Gen 11', 'Ultrabook Lenovo, Intel i7-1365U, 16 Go RAM, 512 Go SSD.', 1649.00, 1, 'Lenovo', 'TP-X1C-G11', true, 4.5, 189, ARRAY['laptop', 'lenovo', 'ultrabook', 'business']),
  ('Galaxy S24 Ultra', 'Smartphone Samsung, Snapdragon 8 Gen 3, 256 Go, 5G, S Pen.', 1299.00, 2, 'Samsung', 'GS24U-256', true, 4.6, 567, ARRAY['smartphone', 'samsung', '5g', 's-pen']),
  ('iPhone 15 Pro', 'Smartphone Apple A17 Pro, 256 Go, USB-C, titane.', 1229.00, 2, 'Apple', 'IP15P-256', true, 4.7, 891, ARRAY['smartphone', 'apple', 'usb-c', 'titane']),
  ('Sony WH-1000XM5', 'Casque audio sans fil, noise cancelling adaptatif, 30h autonomie.', 349.00, 3, 'Sony', 'WH1000XM5', true, 4.7, 1203, ARRAY['casque', 'bluetooth', 'noise-cancelling', 'sony']),
  ('AirPods Pro 2', 'Ecouteurs Apple, USB-C, noise cancelling adaptatif, audio spatial.', 279.00, 3, 'Apple', 'APP2-USBC', true, 4.6, 2341, ARRAY['ecouteurs', 'apple', 'bluetooth', 'noise-cancelling']),
  ('Dell U2723QE', 'Moniteur 27 pouces, 4K UHD, USB-C hub 90W, IPS Black.', 479.00, 4, 'Dell', 'U2723QE', true, 4.4, 156, ARRAY['moniteur', '4k', 'usb-c', 'dell']),
  ('LG UltraWide 34WP85C', 'Moniteur 34 pouces, 3440x1440, courbe, USB-C 90W.', 599.00, 4, 'LG', '34WP85C', false, 4.3, 98, ARRAY['moniteur', 'ultrawide', 'courbe', 'lg']),
  ('Keychron K8 Pro', 'Clavier mecanique, Gateron Brown, sans fil Bluetooth, retroeclaire.', 109.00, 5, 'Keychron', 'K8P-BROWN', true, 4.5, 423, ARRAY['clavier', 'mecanique', 'bluetooth', 'keychron']),
  ('Logitech MX Master 3S', 'Souris ergonomique, multi-device, USB-C, silencieuse.', 99.00, 5, 'Logitech', 'MXM3S', true, 4.6, 1567, ARRAY['souris', 'ergonomique', 'bluetooth', 'logitech']);

Le pipeline de synchronisation incrementale

# logstash/pipeline/products-incremental.conf

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/postgresql-42.7.3.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://postgres:5432/shop"
    jdbc_user => "app"
    jdbc_password => "${PG_PASSWORD}"

    # Requete avec JOIN pour inclure la categorie
    statement => "
      SELECT
        p.id,
        p.name,
        p.description,
        p.price,
        p.brand,
        p.sku,
        p.in_stock,
        p.rating,
        p.review_count,
        p.tags,
        p.is_deleted,
        p.created_at,
        p.updated_at,
        c.name AS category_name,
        c.slug AS category_slug
      FROM products p
      LEFT JOIN categories c ON p.category_id = c.id
      WHERE p.updated_at > :sql_last_value
      ORDER BY p.updated_at ASC
    "

    schedule => "*/30 * * * * *"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/data/.products_last_run"
    jdbc_paging_enabled => true
    jdbc_page_size => 500
  }
}

filter {
  # Stocker l'ID pour l'upsert
  mutate {
    add_field => { "[@metadata][document_id]" => "%{id}" }
  }

  # Convertir les types
  mutate {
    convert => {
      "price" => "float"
      "rating" => "float"
      "review_count" => "integer"
    }
  }

  # Structurer la categorie dans un objet
  mutate {
    rename => {
      "category_name" => "[category][name]"
      "category_slug" => "[category][slug]"
    }
  }

  # Convertir le tableau PostgreSQL en tableau JSON
  if [tags] {
    ruby {
      code => '
        tags_str = event.get("tags")
        if tags_str.is_a?(String) && tags_str.start_with?("{")
          tags_array = tags_str.gsub(/[{}]/, "").split(",")
          event.set("tags", tags_array)
        end
      '
    }
  }

  # Gerer les soft deletes
  if [is_deleted] == true {
    mutate {
      add_field => { "[@metadata][action]" => "delete" }
    }
  } else {
    mutate {
      add_field => { "[@metadata][action]" => "index" }
    }
  }

  # Nettoyer les champs internes
  mutate {
    remove_field => ["@version", "@timestamp", "is_deleted"]
  }
}

output {
  if [@metadata][action] == "delete" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      document_id => "%{[@metadata][document_id]}"
      action => "delete"
    }
  } else {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "products"
      document_id => "%{[@metadata][document_id]}"
      action => "index"
    }
  }
}

Gestion des suppressions

PostgreSQL utilise un soft delete (is_deleted = true). Quand un produit est "supprime", le trigger met updated_at à jour, Logstash détecté le changement, et le filtre envoie un action => "delete" a Elasticsearch.

C'est le seul pattern fiable. Un vrai DELETE FROM products ne peut pas etre détecté par le JDBC input (la ligne n'existe plus dans le SELECT).

Le compose.yaml

yamlservices:
  postgres:
    image: postgres:16
    container_name: postgres
    environment:
      - POSTGRES_DB=shop
      - POSTGRES_USER=app
      - POSTGRES_PASSWORD=secret
    ports:
      - "5432:5432"
    volumes:
      - ./sql/:/docker-entrypoint-initdb.d/:ro
      - pg-data:/var/lib/postgresql/data

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
    container_name: es
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl -fs http://localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10

  logstash:
    image: docker.elastic.co/logstash/logstash:8.17.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms256m -Xmx256m"
      - PG_PASSWORD=secret
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./logstash/drivers/:/usr/share/logstash/drivers/:ro
      - ./logstash/templates/:/usr/share/logstash/templates/:ro
      - logstash-data:/usr/share/logstash/data
    depends_on:
      elasticsearch:
        condition: service_healthy
      postgres:
        condition: service_started

  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  es-data:
  pg-data:
  logstash-data:

Tester la synchronisation

bash# Demarrer
docker compose up -d

# Attendre 30 secondes (premier polling JDBC)

# Verifier que les 10 produits sont indexes
curl -s "http://localhost:9200/products/_count" | python3 -m json.tool

# Chercher un produit
curl -s "http://localhost:9200/products/_search?q=casque+bluetooth" | python3 -m json.tool

# Modifier un produit dans PostgreSQL
docker exec postgres psql -U app -d shop -c \
  "UPDATE products SET price = 2599.00 WHERE sku = 'MBP16-M3-512';"

# Attendre 30 secondes, verifier la mise a jour
curl -s "http://localhost:9200/products/_doc/1" | python3 -c "
import json, sys
doc = json.load(sys.stdin)
print(f'Price: {doc[\"_source\"][\"price\"]}')"

# Soft-delete un produit
docker exec postgres psql -U app -d shop -c \
  "UPDATE products SET is_deleted = true WHERE sku = 'K8P-BROWN';"

# Attendre 30 secondes, verifier la suppression
curl -s "http://localhost:9200/products/_count" | python3 -m json.tool
# count devrait etre 9

Sur paltemps.fr, ce pattern sert pour synchroniser les catalogues de produits et les annuaires. La base relationnelle gere les transactions et la coherence, Elasticsearch gere la recherche. Chacun fait ce qu'il fait de mieux.

Résumé

  • Logstash + JDBC transforme Logstash en outil ETL pour synchroniser SQL -> Elasticsearch
  • La synchronisation incrementale utilise sql_last_value et une colonne updated_at
  • Le trigger PostgreSQL update_timestamp() met updated_at à jour automatiquement
  • L'upsert se fait avec document_id et action => "index"
  • Les suppressions passent par un soft delete (is_deleted) pour etre detectees par le polling
  • Le schedule */30 * * * * * polle toutes les 30 secondes (ajuste selon le besoin)

Precedent : 27 - Cas pratique : logs applicatifs | Suivant : 29 - Cas pratique : enrichissement

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.