Streams Buffers Backpressure and Binary Data

Reading time
6 min read
Word count
1150 words
Diagram count
0 diagrams

Source: Victor Bona's Obsidian Compendium snapshot, Knowledge base/nodejs-v8-runtime-engineering/09 Streams Buffers Backpressure and Binary Data.md.

Purpose: Make Node.js streams, buffers, backpressure, encodings, and binary data handling operationally predictable for services that move large data without exhausting memory.

Streams, Buffers, Backpressure, and Binary Data

Parent map: Node.js V8 Runtime Engineering

Related notes:

Why streams matter

Streams are Node's pressure-aware abstraction for incremental data. They let a process handle data sets larger than memory, connect files to sockets, transform data chunk by chunk, and keep producers from outrunning consumers.

The default failure mode of non-streaming code is simple: buffer the world, then crash or stall.

// Bad for large objects or untrusted input.
const body = await fs.promises.readFile(path);
await upload(body);

Streaming changes the shape:

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";

await pipeline(
  createReadStream("input.log"),
  createGzip(),
  createWriteStream("input.log.gz"),
);

pipeline() is the production default because it wires errors, closure, and backpressure across the chain.

Stream kinds

KindDirectionExamplesImplementation method
ReadableProduces chunksfs.createReadStream, HTTP request body_read()
WritableConsumes chunksfs.createWriteStream, HTTP response_write()
DuplexReads and writes independentlyTCP socket, TLS socket_read() and _write()
TransformReads input and writes transformed outputgzip, CSV parser, hash stream_transform()

Object mode streams move JavaScript values. Binary streams move Buffer, Uint8Array, or strings depending on encoding.

Backpressure

Backpressure is the signal that the downstream consumer cannot safely accept more data right now.

For writable streams:

  • writable.write(chunk) returns true when the internal queue is below its threshold.
  • It returns false when the producer should pause.
  • The writable emits drain when it is ready for more.
import { once } from "node:events";

async function writeAll(writable, chunks) {
  for await (const chunk of chunks) {
    if (!writable.write(chunk)) {
      await once(writable, "drain");
    }
  }
  writable.end();
  await once(writable, "finish");
}

This is easy to get wrong. Prefer pipeline() unless you have a narrow reason to manually wire the flow.

High water marks

highWaterMark is a threshold, not a hard maximum. It influences when streams apply backpressure.

ContextMeaningRisk if too lowRisk if too high
Binary readableBuffered bytes before pausing pullMore syscalls, lower throughputMore memory per stream
Binary writableQueued bytes before write() returns falseUnderutilized sinkMemory growth under slow sink
Object modeBuffered object countScheduling overheadRetained object graphs

Do not tune highWaterMark blindly. Measure throughput, memory, and latency with representative payload sizes.

Pipeline and cancellation

Modern stream/promises.pipeline() supports AbortSignal. That makes it fit the cancellation model from 08 Async Programming Promises Async Await Timers and Cancellation.

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";

async function compressFile(input, output, { signal }) {
  await pipeline(
    createReadStream(input),
    createGzip(),
    createWriteStream(output),
    { signal },
  );
}

Rules:

  • Pass the same request or job signal through the whole pipeline.
  • If you use async generator transforms, accept and respect the { signal } argument.
  • Treat AbortError as cancellation, not as a mysterious I/O failure.
  • Clean up partial output files when cancellation or write errors occur.

Async iterator streams

Readable streams are async iterable. This is good for parsing protocols, line readers, and controlled processing.

for await (const chunk of readable) {
  await processChunk(chunk);
}

But writing to a sink inside a loop must still honor backpressure.

import { once } from "node:events";

for await (const chunk of readable) {
  if (!writable.write(transform(chunk))) {
    await once(writable, "drain");
  }
}
writable.end();

If the transform is mostly mechanical, pipeline() is clearer and less error-prone.

Buffers

Buffer is Node's binary byte container. It is a subclass of Uint8Array with Node-specific methods for encodings, integer reads and writes, slicing, and pooling.

ConstructorInitializes memoryUse case
Buffer.from(string, encoding)Encodes known dataText to bytes
Buffer.from(array)Copies valuesSmall explicit bytes
Buffer.from(arrayBuffer)Views existing memoryInterop with Web APIs
Buffer.alloc(size)Zero-filledSafe new output buffer
Buffer.allocUnsafe(size)Not zero-filledPerformance path only when fully overwritten
Buffer.concat(buffers, totalLength)Copies buffers into oneProtocol frames and final assembly

Security rule: never expose or serialize bytes from Buffer.allocUnsafe() unless every byte has been overwritten.

const header = Buffer.alloc(8);
header.writeUInt32BE(payload.length, 0);
header.writeUInt32BE(0x01, 4);

Slices, views, and copies

Buffer APIs often create views over the same memory rather than copies.

const packet = Buffer.from("abcdef");
const view = packet.subarray(0, 3);
view[0] = 0x5a;
console.log(packet.toString()); // "Zbcdef"

Production guidance:

  • Use subarray() for zero-copy parsing while the parent buffer is still owned and immutable.
  • Use Buffer.from(view) when data must outlive the parent buffer or be isolated from mutation.
  • Avoid retaining a tiny slice of a huge buffer in long-lived structures. It can keep the larger allocation alive.
  • Document ownership when passing buffers across module boundaries.

Encodings

Binary and text are different domains. A Buffer has bytes; a string has characters. UTF-8 characters can span multiple bytes and can be split across chunks.

TaskCorrect toolFootgun
Decode whole known UTF-8 payloadbuffer.toString("utf8")Using wrong encoding
Decode streaming textTextDecoder with streaming or StringDecoderSplitting multibyte characters
Encode stringBuffer.from(text, "utf8")Assuming length equals byte length
Count bytesBuffer.byteLength(text, "utf8")Using text.length
Parse binary protocolreadUInt*, readInt*, readBigUInt*Endianness mismatch
const text = "cafe";
const bytes = Buffer.from(text, "utf8");
console.log(text.length);
console.log(bytes.length);

Even when the numbers match for ASCII, build code as if they will not.

Transform stream example

import { Transform } from "node:stream";

class LinePrefixer extends Transform {
  constructor(prefix) {
    super();
    this.prefix = Buffer.from(prefix);
    this.leftover = Buffer.alloc(0);
  }

  _transform(chunk, encoding, callback) {
    try {
      const data = Buffer.concat([this.leftover, chunk]);
      const parts = data.toString("utf8").split("\n");
      this.leftover = Buffer.from(parts.pop() ?? "");
      for (const line of parts) {
        this.push(Buffer.concat([
          this.prefix,
          Buffer.from(line),
          Buffer.from("\n"),
        ]));
      }
      callback();
    } catch (error) {
      callback(error);
    }
  }

  _flush(callback) {
    if (this.leftover.length > 0) {
      this.push(Buffer.concat([this.prefix, this.leftover]));
    }
    callback();
  }
}

The example is intentionally small, but it shows the mechanics: retain incomplete data, push only complete frames, and report errors through the callback.

File and process streaming

Streams are the natural boundary between this note and 10 Filesystem Processes Signals Workers Cluster and Child Processes.

Source or sinkStream APINotes
File readfs.createReadStream(path)Honors backpressure from downstream
File writefs.createWriteStream(path)Watch finish and error
Child stdoutchild.stdoutDrain it or the child can block
Child stdinchild.stdinHonor write() return value
HTTP requestreq readableEnforce size limits
HTTP responseres writableHandle client disconnect

Size limits

Streaming does not remove the need for explicit limits.

async function readLimited(readable, maxBytes) {
  const chunks = [];
  let total = 0;

  for await (const chunk of readable) {
    total += chunk.length;
    if (total > maxBytes) {
      throw new Error(`payload too large: ${total} bytes`);
    }
    chunks.push(chunk);
  }

  return Buffer.concat(chunks, total);
}

Use this pattern sparingly. If the next step can stream, keep streaming.

Common footguns

FootgunSymptomSafer pattern
Ignoring write() return valueMemory grows under slow clientsAwait drain or use pipeline()
Manual .pipe() chain without error handlingHung or half-closed streamsUse pipeline()
Buffer.allocUnsafe() for response bytesData leak riskBuffer.alloc() or fully overwrite
chunk.toString() per chunk for UTF-8 protocolCorrupted multibyte charactersStreaming decoder
text.length as byte countTruncated or oversized framesBuffer.byteLength()
Retaining subarray of huge bufferUnexpected memory retentionCopy small retained slice
Unbounded Buffer.concat() loopQuadratic copying and memory spikesTrack chunks, concat once, or stream
No request body limitMemory or disk exhaustionLimit bytes before parsing
Transform ignores callback errorsPipeline never fails correctlycallback(error) or destroy stream

Troubleshooting stream incidents

SymptomFirst checksLikely cause
RSS grows with slow clientsWritable queue lengths, write return valuesBackpressure ignored
Pipeline hangs after abortAsync generator signal handlingTransform did not observe signal
Output file is corrupt after failurePartial file cleanupMissing atomic write pattern
Child process appears stuckstdout and stderr consumersPipe buffer full
Unicode replacement charactersChunk boundary decodingSplit multibyte sequence
High CPU in transformPer-byte JavaScript loopsBatch chunks or move CPU work
High latency on uploadhighWaterMark and downstream speedSink pressure or remote bottleneck

Production patterns

Atomic streaming write

import { pipeline } from "node:stream/promises";
import { createWriteStream } from "node:fs";
import { rename, rm } from "node:fs/promises";

async function writeStreamAtomically(source, destination, { signal }) {
  const temp = `${destination}.${process.pid}.tmp`;
  try {
    await pipeline(source, createWriteStream(temp), { signal });
    await rename(temp, destination);
  } catch (error) {
    await rm(temp, { force: true }).catch(() => {});
    throw error;
  }
}

Bounded frame parser

function readFrame(buffer) {
  if (buffer.length < 4) return null;
  const length = buffer.readUInt32BE(0);
  if (length > 1_000_000) throw new Error("frame too large");
  if (buffer.length < 4 + length) return null;
  return {
    frame: buffer.subarray(4, 4 + length),
    rest: buffer.subarray(4 + length),
  };
}

Streamed child process

import { spawn } from "node:child_process";
import { pipeline } from "node:stream/promises";

async function gzipFile(inputStream, outputStream, { signal }) {
  const gzip = spawn("gzip", ["-c"], {
    stdio: ["pipe", "pipe", "inherit"],
    signal,
  });

  await Promise.all([
    pipeline(inputStream, gzip.stdin, { signal }),
    pipeline(gzip.stdout, outputStream, { signal }),
  ]);
}

Production checklist

  • Use pipeline() for multi-stream flows.
  • Pass cancellation signals through pipelines.
  • Enforce byte limits at trust boundaries.
  • Treat buffers as owned or borrowed and document which.
  • Avoid allocUnsafe() unless overwrite is mechanically guaranteed.
  • Decode streaming text with a streaming decoder.
  • Respect write() and drain when manually writing.
  • Consume child process stdout and stderr.
  • Clean partial files after failed pipelines.
  • Measure memory per concurrent stream, not just total throughput.

Official docs checked