14 - Streams et backpressure
Ce que tu vas apprendre
- Pourquoi lire un fichier entier en mémoire est un problème
- Les 4 types de streams Node.js : Readable, Writable, Transform, Duplex
- pipeline() et pourquoi c'est mieux que pipe()
- La backpressure : ce que c'est, pourquoi c'est critique, highWaterMark
- L'API Web Streams (ReadableStream, WritableStream)
- Traitement CSV/JSON ligne par ligne
- Comparaison mémoire : buffered vs streaming
Prerequisites
Avoir lu l'article 13 sur les Workers et la mémoire partagee.
Imaginons que tu doives traiter un fichier CSV de 2 Go. La première idee qui vient :
typescriptimport fs from "node:fs";
// Ne fais JAMAIS ca avec un fichier de 2 Go
const content = fs.readFileSync("data.csv", "utf-8");
const lines = content.split("\n");
// -> 2 Go en memoire pour le string
// -> 2 Go de plus pour le tableau de lignes
// -> boom: JavaScript heap out of memory
Meme avec readFile (la version async), le problème est le meme : tout le fichier est charge en mémoire d'un coup. Pour un fichier de 50 Mo, ca passe. Pour 2 Go, ton process crash.
Les streams resolvent ca en traitant les donnees morceau par morceau.
Les 4 types de streams Node.js
- Readable : source de donnees (fichier, requête HTTP, stdin)
- Writable : destination (fichier, réponse HTTP, stdout)
- Transform : lit et écrit, transforme les donnees au passage (compression, parsing)
- Duplex : lit et écrit indépendamment (socket TCP)
typescriptimport fs from "node:fs";
// Readable : lecture morceau par morceau
const readable = fs.createReadStream("data.csv", {
encoding: "utf-8",
highWaterMark: 64 * 1024, // 64 Ko par chunk
});
// Writable : ecriture morceau par morceau
const writable = fs.createWriteStream("output.csv");
// Chaque chunk fait 64 Ko max, pas 2 Go
readable.on("data", (chunk) => {
writable.write(processChunk(chunk));
});
readable.on("end", () => {
writable.end();
});
Avec cette approche, la mémoire utilisee est proportionnelle a la taille du chunk (64 Ko), pas a la taille du fichier (2 Go).
pipeline() : la bonne facon de connecter des streams
L'ancienne méthode .pipe() ne gere pas bien les erreurs. Si le stream de destination crash, le stream source continue a lire et la mémoire explose. pipeline() gere tout ca proprement :
typescriptimport { pipeline } from "node:stream/promises";
import fs from "node:fs";
import zlib from "node:zlib";
// Lire un fichier, le compresser, l'ecrire
await pipeline(
fs.createReadStream("data.csv"),
zlib.createGzip(),
fs.createWriteStream("data.csv.gz")
);
pipeline :
- Connecte les streams entre eux
- Propage les erreurs correctement
- Detruit tous les streams si l'un d'eux echoue
- Retourne une Promise (avec
stream/promises)
C'est la méthode recommandee depuis Node 10+.
La backpressure
C'est le concept le plus sous-estime des streams. La backpressure, c'est quand le consommateur (Writable) est plus lent que le producteur (Readable).
Sans backpressure : le Readable lit a pleine vitesse, les donnees s'accumulent dans un buffer intermediaire, et la mémoire explose. C'est exactement le problème qu'on voulait éviter.
Avec backpressure : quand le buffer intermediaire atteint le highWaterMark, le Readable est mis en pause automatiquement. Quand le Writable a rattrape son retard, le Readable reprend.
typescriptconst readable = fs.createReadStream("big-file.csv", {
highWaterMark: 16 * 1024, // 16 Ko
});
const writable = fs.createWriteStream("output.csv", {
highWaterMark: 16 * 1024,
});
// pipe() et pipeline() gerent la backpressure automatiquement
// En mode manuel, il faut la gerer soi-meme :
readable.on("data", (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Le buffer du writable est plein, on met en pause
readable.pause();
writable.once("drain", () => {
// Le writable a vidange son buffer, on reprend
readable.resume();
});
}
});
highWaterMark est le seuil en octets. Quand le buffer interne dépassé cette valeur, write() retourne false. C'est le signal de backpressure.
Avec pipeline(), tu n'as pas a gerer ca manuellement. C'est une raison de plus de l'utiliser.
Traitement CSV ligne par ligne
Le problème avec les streams bruts : un chunk ne correspond pas forcement a une ligne complète. Un chunk de 64 Ko peut se terminer au milieu d'une ligne. Il faut un Transform stream qui decoupe par lignes :
typescriptimport { Transform, TransformCallback } from "node:stream";
import { pipeline } from "node:stream/promises";
import fs from "node:fs";
class LineSplitter extends Transform {
private remainder = "";
_transform(chunk: Buffer, encoding: string, callback: TransformCallback): void {
const text = this.remainder + chunk.toString();
const lines = text.split("\n");
this.remainder = lines.pop() || "";
for (const line of lines) {
if (line.trim()) {
this.push(line + "\n");
}
}
callback();
}
_flush(callback: TransformCallback): void {
if (this.remainder.trim()) {
this.push(this.remainder + "\n");
}
callback();
}
}
await pipeline(
fs.createReadStream("data.csv"),
new LineSplitter(),
fs.createWriteStream("cleaned.csv")
);
_flush est appele quand le stream source est termine. Il traite le reste qui n'a pas ete suivi d'un \n.
Pour un vrai projet, utilise une lib comme csv-parse qui gere les guillemets, les virgules dans les valeurs, les headers, etc.
Streaming JSON
Parser un fichier JSON de 500 Mo d'un coup, meme problème que le CSV. Mais JSON n'est pas un format ligne par ligne. Il faut un parser incrementiel :
typescript// Avec la lib stream-json
import { parser } from "stream-json";
import { streamArray } from "stream-json/streamers/StreamArray";
import { pipeline } from "node:stream/promises";
import fs from "node:fs";
// Fichier : [{"id": 1, ...}, {"id": 2, ...}, ... millions d'objets]
const jsonStream = fs.createReadStream("users.json")
.pipe(parser())
.pipe(streamArray());
let count = 0;
for await (const { value } of jsonStream) {
// value est un objet JS, un par un
await processUser(value);
count++;
}
console.log(`${count} utilisateurs traites`);
Chaque objet est parse et traite individuellement. La mémoire reste constante, quelle que soit la taille du fichier.
Web Streams API
L'API Web Streams est le standard web, disponible dans les navigateurs et dans Node.js (depuis la v18) :
typescript// Lire un fichier en streaming dans le navigateur
const response = await fetch("/api/large-data");
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let total = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
total += text.length;
// Traiter le chunk
}
Les Web Streams sont interoperables avec les Node.js streams :
typescriptimport { Readable } from "node:stream";
// Convertir un Web ReadableStream en Node.js Readable
const nodeStream = Readable.fromWeb(webReadableStream);
// L'inverse
const webStream = Readable.toWeb(nodeReadableStream);
Streaming HTTP
Envoyer une réponse HTTP en streaming au lieu de construire le body entier en mémoire :
typescriptimport { pipeline } from "node:stream/promises";
import fs from "node:fs";
app.get("/api/export", async (req, res) => {
res.setHeader("Content-Type", "text/csv");
res.setHeader("Content-Disposition", "attachment; filename=export.csv");
// Ecrire le header
res.write("id,name,email\n");
// Streamer depuis la base de donnees
const cursor = db.collection("users").find().stream();
for await (const user of cursor) {
res.write(`${user.id},${user.name},${user.email}\n`);
}
res.end();
});
Le client recoit les donnees au fur et a mesure. La mémoire du serveur reste constante.
Comparaison mémoire : buffered vs streaming
J'ai mesure la mémoire pour traiter un fichier CSV de 1 Go (10 millions de lignes) :
| Méthode | Mémoire pic | Temps |
|---|---|---|
readFileSync + split |
2.1 Go (crash a 1.5 Go par défaut) | N/A |
readFile + split |
2.1 Go | 12s |
createReadStream + pipeline |
28 Mo | 14s |
createReadStream avec highWaterMark: 16384 |
18 Mo | 15s |
La version streaming utilise 100x moins de mémoire. Le temps est comparable (un peu plus lent a cause du traitement chunk par chunk, mais negligeable).
Sur paltemps.fr, tous les exports de donnees passent par des streams. Que ce soit 100 lignes ou 100 000 lignes, la mémoire du serveur ne bouge pas. C'est le genre de décision architecturale qui évité les surprises en production quand le volume de donnees augmente.
Les erreurs classiques
Ne pas attendre le drain event :
typescript// MAUVAIS : ignore la backpressure
for (const item of millionItems) {
writable.write(JSON.stringify(item) + "\n");
// Le buffer interne grossit sans limite
}
// BON : respecter la backpressure
for (const item of millionItems) {
const ok = writable.write(JSON.stringify(item) + "\n");
if (!ok) {
await new Promise((resolve) => writable.once("drain", resolve));
}
}
Ne pas détruire les streams en cas d'erreur (utiliser pipeline resout ca). Ne pas appeler end() sur le writable quand c'est fini. Ces oublis causent des fuites de file descriptors et de mémoire.
Résumé
- Lire un gros fichier en entier explose la mémoire, les streams traitent morceau par morceau
- Quatre types de streams Node.js : Readable, Writable, Transform, Duplex
pipeline()est la facon recommandee de connecter des streams (gestion des erreurs et backpressure)- La backpressure empeche le producteur de submerger le consommateur (highWaterMark)
- L'API Web Streams est le standard navigateur, interoperable avec les streams Node.js
- Un fichier de 1 Go se traite en 18-28 Mo de mémoire avec les streams
Article précédent : 13 - Workers et mémoire partagee Article suivant : 15 - React et la mémoire