Mémoire et performance JS/TS - 14 - Streams et backpressure

Traiter des fichiers de plusieurs Go sans exploser la mémoire grace aux streams Node.js et Web Streams. Backpressure, pipeline et traitement ligne par ligne.

  1. 01 Mémoire et performance JS/TS - 00 - Pourquoi la mémoire compte meme avec un garbage collector
  2. 02 Mémoire et performance JS/TS - 01 - Stack vs Heap
  3. 03 Mémoire et performance JS/TS - 02 - Le cycle de vie de la mémoire
  4. 04 Mémoire et performance JS/TS - 03 - Le garbage collector
  5. 05 Mémoire et performance JS/TS - 04 - V8 en profondeur
  6. 06 Mémoire et performance JS/TS - 05 - Les 6 fuites mémoire classiques
  7. 07 Mémoire et performance JS/TS - 06 - Closures et mémoire
  8. 08 Mémoire et performance JS/TS - 07 - WeakRef, WeakMap et WeakSet
  9. 09 Mémoire et performance JS/TS - 08 - FinalizationRegistry : savoir quand le GC passe
  10. 10 Mémoire et performance JS/TS - 09 - DevTools Memory : investiguer dans Chrome
  11. 11 Mémoire et performance JS/TS - 10 - Profiling mémoire en Node.js
  12. 12 Mémoire et performance JS/TS - 11 - Détecter et corriger les fuites mémoire
  13. 13 Mémoire et performance JS/TS - 12 - ArrayBuffer et TypedArrays
  14. 14 Mémoire et performance JS/TS - 13 - Workers et mémoire partagee
  15. 15 Mémoire et performance JS/TS - 14 - Streams et backpressure
  16. 16 Mémoire et performance JS/TS - 15 - Fuites mémoire en React
  17. 17 Mémoire et performance JS/TS - 16 - Serveurs Node.js et mémoire
  18. 18 Mémoire et performance JS/TS - 17 - Mémoire et Docker
  19. 19 Mémoire et performance JS/TS - 18 - Optimisations mémoire
  20. 20 Mémoire et performance JS/TS - 19 - Comparaison avec d'autres langages
  21. 21 Mémoire et performance JS/TS - 20 - Tester la mémoire
  22. 22 Mémoire et performance JS/TS - 21 - Glossaire

14 - Streams et backpressure

Ce que tu vas apprendre

  • Pourquoi lire un fichier entier en mémoire est un problème
  • Les 4 types de streams Node.js : Readable, Writable, Transform, Duplex
  • pipeline() et pourquoi c'est mieux que pipe()
  • La backpressure : ce que c'est, pourquoi c'est critique, highWaterMark
  • L'API Web Streams (ReadableStream, WritableStream)
  • Traitement CSV/JSON ligne par ligne
  • Comparaison mémoire : buffered vs streaming

Prerequisites

Avoir lu l'article 13 sur les Workers et la mémoire partagee.


Imaginons que tu doives traiter un fichier CSV de 2 Go. La première idee qui vient :

typescriptimport fs from "node:fs";

// Ne fais JAMAIS ca avec un fichier de 2 Go
const content = fs.readFileSync("data.csv", "utf-8");
const lines = content.split("\n");
// -> 2 Go en memoire pour le string
// -> 2 Go de plus pour le tableau de lignes
// -> boom: JavaScript heap out of memory

Meme avec readFile (la version async), le problème est le meme : tout le fichier est charge en mémoire d'un coup. Pour un fichier de 50 Mo, ca passe. Pour 2 Go, ton process crash.

Les streams resolvent ca en traitant les donnees morceau par morceau.

Les 4 types de streams Node.js

  • Readable : source de donnees (fichier, requête HTTP, stdin)
  • Writable : destination (fichier, réponse HTTP, stdout)
  • Transform : lit et écrit, transforme les donnees au passage (compression, parsing)
  • Duplex : lit et écrit indépendamment (socket TCP)
typescriptimport fs from "node:fs";

// Readable : lecture morceau par morceau
const readable = fs.createReadStream("data.csv", {
  encoding: "utf-8",
  highWaterMark: 64 * 1024, // 64 Ko par chunk
});

// Writable : ecriture morceau par morceau
const writable = fs.createWriteStream("output.csv");

// Chaque chunk fait 64 Ko max, pas 2 Go
readable.on("data", (chunk) => {
  writable.write(processChunk(chunk));
});
readable.on("end", () => {
  writable.end();
});

Avec cette approche, la mémoire utilisee est proportionnelle a la taille du chunk (64 Ko), pas a la taille du fichier (2 Go).

pipeline() : la bonne facon de connecter des streams

L'ancienne méthode .pipe() ne gere pas bien les erreurs. Si le stream de destination crash, le stream source continue a lire et la mémoire explose. pipeline() gere tout ca proprement :

typescriptimport { pipeline } from "node:stream/promises";
import fs from "node:fs";
import zlib from "node:zlib";

// Lire un fichier, le compresser, l'ecrire
await pipeline(
  fs.createReadStream("data.csv"),
  zlib.createGzip(),
  fs.createWriteStream("data.csv.gz")
);

pipeline :

  • Connecte les streams entre eux
  • Propage les erreurs correctement
  • Detruit tous les streams si l'un d'eux echoue
  • Retourne une Promise (avec stream/promises)

C'est la méthode recommandee depuis Node 10+.

La backpressure

C'est le concept le plus sous-estime des streams. La backpressure, c'est quand le consommateur (Writable) est plus lent que le producteur (Readable).

Sans backpressure : le Readable lit a pleine vitesse, les donnees s'accumulent dans un buffer intermediaire, et la mémoire explose. C'est exactement le problème qu'on voulait éviter.

Avec backpressure : quand le buffer intermediaire atteint le highWaterMark, le Readable est mis en pause automatiquement. Quand le Writable a rattrape son retard, le Readable reprend.

typescriptconst readable = fs.createReadStream("big-file.csv", {
  highWaterMark: 16 * 1024, // 16 Ko
});

const writable = fs.createWriteStream("output.csv", {
  highWaterMark: 16 * 1024,
});

// pipe() et pipeline() gerent la backpressure automatiquement
// En mode manuel, il faut la gerer soi-meme :
readable.on("data", (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    // Le buffer du writable est plein, on met en pause
    readable.pause();
    writable.once("drain", () => {
      // Le writable a vidange son buffer, on reprend
      readable.resume();
    });
  }
});

highWaterMark est le seuil en octets. Quand le buffer interne dépassé cette valeur, write() retourne false. C'est le signal de backpressure.

Avec pipeline(), tu n'as pas a gerer ca manuellement. C'est une raison de plus de l'utiliser.

Traitement CSV ligne par ligne

Le problème avec les streams bruts : un chunk ne correspond pas forcement a une ligne complète. Un chunk de 64 Ko peut se terminer au milieu d'une ligne. Il faut un Transform stream qui decoupe par lignes :

typescriptimport { Transform, TransformCallback } from "node:stream";
import { pipeline } from "node:stream/promises";
import fs from "node:fs";

class LineSplitter extends Transform {
  private remainder = "";

  _transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
    const text = this.remainder + chunk.toString();
    const lines = text.split("\n");
    this.remainder = lines.pop() || "";

    for (const line of lines) {
      if (line.trim()) {
        this.push(line + "\n");
      }
    }
    callback();
  }

  _flush(callback: TransformCallback): void {
    if (this.remainder.trim()) {
      this.push(this.remainder + "\n");
    }
    callback();
  }
}

await pipeline(
  fs.createReadStream("data.csv"),
  new LineSplitter(),
  fs.createWriteStream("cleaned.csv")
);

_flush est appele quand le stream source est termine. Il traite le reste qui n'a pas ete suivi d'un \n.

Pour un vrai projet, utilise une lib comme csv-parse qui gere les guillemets, les virgules dans les valeurs, les headers, etc.

Streaming JSON

Parser un fichier JSON de 500 Mo d'un coup, meme problème que le CSV. Mais JSON n'est pas un format ligne par ligne. Il faut un parser incrementiel :

typescript// Avec la lib stream-json
import { parser } from "stream-json";
import { streamArray } from "stream-json/streamers/StreamArray";
import { pipeline } from "node:stream/promises";
import fs from "node:fs";

// Fichier : [{"id": 1, ...}, {"id": 2, ...}, ... millions d'objets]
const jsonStream = fs.createReadStream("users.json")
  .pipe(parser())
  .pipe(streamArray());

let count = 0;
for await (const { value } of jsonStream) {
  // value est un objet JS, un par un
  await processUser(value);
  count++;
}
console.log(`${count} utilisateurs traites`);

Chaque objet est parse et traite individuellement. La mémoire reste constante, quelle que soit la taille du fichier.

Web Streams API

L'API Web Streams est le standard web, disponible dans les navigateurs et dans Node.js (depuis la v18) :

typescript// Lire un fichier en streaming dans le navigateur
const response = await fetch("/api/large-data");
const reader = response.body!.getReader();
const decoder = new TextDecoder();

let total = 0;
while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value, { stream: true });
  total += text.length;
  // Traiter le chunk
}

Les Web Streams sont interoperables avec les Node.js streams :

typescriptimport { Readable } from "node:stream";

// Convertir un Web ReadableStream en Node.js Readable
const nodeStream = Readable.fromWeb(webReadableStream);

// L'inverse
const webStream = Readable.toWeb(nodeReadableStream);

Streaming HTTP

Envoyer une réponse HTTP en streaming au lieu de construire le body entier en mémoire :

typescriptimport { pipeline } from "node:stream/promises";
import fs from "node:fs";

app.get("/api/export", async (req, res) => {
  res.setHeader("Content-Type", "text/csv");
  res.setHeader("Content-Disposition", "attachment; filename=export.csv");

  // Ecrire le header
  res.write("id,name,email\n");

  // Streamer depuis la base de donnees
  const cursor = db.collection("users").find().stream();

  for await (const user of cursor) {
    res.write(`${user.id},${user.name},${user.email}\n`);
  }

  res.end();
});

Le client recoit les donnees au fur et a mesure. La mémoire du serveur reste constante.

Comparaison mémoire : buffered vs streaming

J'ai mesure la mémoire pour traiter un fichier CSV de 1 Go (10 millions de lignes) :

Méthode Mémoire pic Temps
readFileSync + split 2.1 Go (crash a 1.5 Go par défaut) N/A
readFile + split 2.1 Go 12s
createReadStream + pipeline 28 Mo 14s
createReadStream avec highWaterMark: 16384 18 Mo 15s

La version streaming utilise 100x moins de mémoire. Le temps est comparable (un peu plus lent a cause du traitement chunk par chunk, mais negligeable).

Sur paltemps.fr, tous les exports de donnees passent par des streams. Que ce soit 100 lignes ou 100 000 lignes, la mémoire du serveur ne bouge pas. C'est le genre de décision architecturale qui évité les surprises en production quand le volume de donnees augmente.

Les erreurs classiques

Ne pas attendre le drain event :

typescript// MAUVAIS : ignore la backpressure
for (const item of millionItems) {
  writable.write(JSON.stringify(item) + "\n");
  // Le buffer interne grossit sans limite
}

// BON : respecter la backpressure
for (const item of millionItems) {
  const ok = writable.write(JSON.stringify(item) + "\n");
  if (!ok) {
    await new Promise((resolve) => writable.once("drain", resolve));
  }
}

Ne pas détruire les streams en cas d'erreur (utiliser pipeline resout ca). Ne pas appeler end() sur le writable quand c'est fini. Ces oublis causent des fuites de file descriptors et de mémoire.


Résumé

  • Lire un gros fichier en entier explose la mémoire, les streams traitent morceau par morceau
  • Quatre types de streams Node.js : Readable, Writable, Transform, Duplex
  • pipeline() est la facon recommandee de connecter des streams (gestion des erreurs et backpressure)
  • La backpressure empeche le producteur de submerger le consommateur (highWaterMark)
  • L'API Web Streams est le standard navigateur, interoperable avec les streams Node.js
  • Un fichier de 1 Go se traite en 18-28 Mo de mémoire avec les streams

Article précédent : 13 - Workers et mémoire partagee Article suivant : 15 - React et la mémoire

Sources

Réservez un audit gratuit de 30 minutes. Je vous montre concrètement ce qu'on peut automatiser.