Node.js Streams: The Complete Guide

Node.js Streams: The Complete Guide

Streams are the reason Node.js can handle giant files without running out of memory. Most tutorials explain the theory and skip the parts that actually trip you up. This one doesn't.

Most tutorials on this topic get it wrong. Not the code samples -- those are usually fine. The problem is they show you how to create a readable stream, how to create a writable stream, and then say "now you know streams!" as if the hard part was the syntax. The hard part is backpressure. The hard part is knowing when .pipe() silently fails. The hard part is figuring out why your transform stream buffers 400MB of data before writing anything. Spoiler: it's not that simple.

So instead of walking through the four stream types like a textbook, I'm going to focus on the stuff that actually matters when you're processing real data in production.

What Streams Give You (And What They Cost)

The pitch for streams is memory efficiency. You've got a 4GB log file. If you use fs.readFile, Node loads the whole thing into memory before your callback even fires. With fs.createReadStream, you get it in chunks -- 16KB at a time by default -- and you process each chunk before moving on. Your app uses a few megabytes instead of 4 gigabytes. That's a real difference, not a micro-optimization.

But there's a cost. Streams are harder to reason about than loading everything into memory and working with it. Error handling is trickier. Debugging is worse. The API surface is large and, honestly, parts of it are kind of ugly. Readable streams have two modes (flowing and paused) that interact in ways that confuse even experienced Node developers. You'll hit edge cases that make you question your career choices.

Worth it? For files over a few hundred MB, absolutely. For HTTP responses you're proxying, yes. For a 50KB JSON config file, no. Use readFile and move on with your life.

Node has four kinds of streams: Readable (source of data), Writable (destination for data), Transform (modifies data passing through), and Duplex (readable and writable independently, like a TCP socket). You'll use the first three constantly. Duplex streams are more of a "you'll know when you need them" thing.

Reading Data Without Blowing Up Memory

Here's the basic pattern. You'll see it everywhere:

const fs = require('fs');

const readable = fs.createReadStream('large-log-file.txt', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024 // 16KB chunks
});

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} characters of data`);
});

readable.on('end', () => {
  console.log('No more data to read');
});

readable.on('error', (err) => {
  console.error('Error reading file:', err.message);
});

The highWaterMark controls how much data Node buffers internally before your 'data' handler gets it. 16KB is the default for file streams. Bump it up if you're processing large files and want fewer, bigger chunks. Lower it if memory is tight.

You can also build your own readable stream. The _read method is where you push data in. Push null when you're done.

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super({ objectMode: true });
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current <= this.max) {
      this.push({ number: this.current });
      this.current++;
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

const counter = new CounterStream(5);
counter.on('data', (obj) => console.log(obj));
// Output: { number: 0 }, { number: 1 }, ... { number: 5 }

Yeah, the API for custom streams is kind of verbose. objectMode: true means you're pushing JavaScript objects instead of strings/buffers. The _read method name with the underscore is a Node convention for "this is a method the framework calls, not one you call directly." You get used to it.

Writing Data and the Drain Problem

Writing to a stream looks straightforward until it isn't. .write() returns a boolean: true means the internal buffer has room, false means it's full and you should stop writing until you hear 'drain'.

const fs = require('fs');

const writable = fs.createWriteStream('output.txt');

for (let i = 0; i < 1000000; i++) {
  const canContinue = writable.write(`Line ${i}\n`);
  if (!canContinue) {
    // Buffer is full, we should wait for drain
    // (simplified here for clarity)
  }
}

writable.end('Final line\n');
writable.on('finish', () => {
  console.log('All data has been flushed to the file');
});

That code "works" but it ignores backpressure. If the disk is slow, you're stuffing data into the buffer faster than it can flush, and memory usage climbs. The right way to do this -- and I'm not going to pretend the code is pretty -- is a drain loop:

const fs = require('fs');

function writeLotsOfData(filePath, totalLines) {
  const writable = fs.createWriteStream(filePath);
  let i = 0;

  function write() {
    let ok = true;
    while (i < totalLines && ok) {
      const data = `Line ${i}: ${Date.now()}\n`;
      i++;
      if (i === totalLines) {
        writable.write(data); // Last write
        writable.end();
      } else {
        ok = writable.write(data);
      }
    }
    if (i < totalLines) {
      // Buffer is full, wait for drain
      writable.once('drain', write);
    }
  }

  write();

  writable.on('finish', () => {
    console.log(`Wrote ${totalLines} lines to ${filePath}`);
  });
}

writeLotsOfData('output.txt', 10000000);

It's a recursive-looking pattern where write() calls itself after each 'drain' event. Not the cleanest code in the world. But it keeps memory usage flat even when writing ten million lines. If you skip this and just loop, you might get away with it for small files but eventually something large will eat all your RAM.

Custom writable streams follow the same underscore convention -- implement _write() and call the callback when you're done processing each chunk:

const { Writable } = require('stream');

class DatabaseWriter extends Writable {
  constructor(db) {
    super({ objectMode: true });
    this.db = db;
  }

  async _write(record, encoding, callback) {
    try {
      await this.db.collection('logs').insertOne(record);
      callback(); // Signal success
    } catch (err) {
      callback(err); // Signal error
    }
  }
}

// Usage:
// readableStream.pipe(new DatabaseWriter(mongoClient.db('myapp')));

Always call that callback. Always. If you don't, the stream hangs forever and you'll spend an hour trying to figure out why nothing is happening.

Transform Streams: Where It Gets Useful

Transform streams sit in the middle of a pipeline. Data comes in one side, gets changed, comes out the other. Think of them as a .map() for streaming data.

Here's one that converts CSV text into JSON objects:

const { Transform } = require('stream');

class CsvToJson extends Transform {
  constructor() {
    super({ objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line in buffer

    for (const line of lines) {
      if (!line.trim()) continue;
      const values = line.split(',');

      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i]?.trim();
      });
      this.push(obj);
    }
    callback();
  }

  _flush(callback) {
    // Process any remaining data in buffer
    if (this.buffer.trim() && this.headers) {
      const values = this.buffer.split(',');
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i]?.trim();
      });
      this.push(obj);
    }
    callback();
  }
}

// Usage:
fs.createReadStream('data.csv')
  .pipe(new CsvToJson())
  .on('data', (record) => console.log(record));

That _flush method runs when the input stream ends. It's your last chance to push out any data sitting in your internal buffer. If the last line of your CSV didn't end with a newline, _flush catches it. Skip it and you silently lose data. Fun.

Duplex streams are the ones where the readable side and writable side are independent. A TCP socket is the textbook example: what you write to it doesn't come back out the other side locally. It goes to the network. What you read from it is what the other end sent. If you're not building network protocols, you probably won't build custom duplex streams often.

pipe() Is Broken. Use pipeline() Instead.

Okay, .pipe() isn't broken broken. But it has a real problem: it doesn't propagate errors. If the source stream errors, the destination stream just sits there, open, leaking resources. You'd have to attach error handlers to every stream in the chain manually.

const fs = require('fs');
const zlib = require('zlib');

// This works but errors are not handled across the chain
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));

Node added stream.pipeline() to fix this. It destroys all streams in the chain if any one of them errors. Use it. Every time.

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
}

compressFile('access.log', 'access.log.gz')
  .then(() => console.log('Done'))
  .catch(console.error);

The stream/promises version gives you async/await support so you don't need promisify. Available since Node 15. If you're on something older, there's the callback version too:

const { pipeline } = require('stream');

pipeline(
  fs.createReadStream(input),
  zlib.createGzip(),
  fs.createWriteStream(output),
  (err) => {
    if (err) console.error('Pipeline failed:', err.message);
    else console.log('Compression complete');
  }
);

Here's a more involved example -- a file processing pipeline that reads a CSV, filters rows, reformats the output, and writes to a new file. All in a single streaming pass, constant memory:

const { pipeline } = require('stream/promises');
const fs = require('fs');
const { Transform } = require('stream');

// Filter rows where sales > 1000
class FilterRows extends Transform {
  constructor() {
    super({ objectMode: true });
  }
  _transform(row, encoding, callback) {
    if (row.sales > 1000) {
      this.push(row);
    }
    callback();
  }
}

// Format as output line
class FormatOutput extends Transform {
  constructor() {
    super({ objectMode: true, writableObjectMode: true });
  }
  _transform(row, encoding, callback) {
    const line = `${row.product}: $${row.sales} revenue\n`;
    this.push(line);
    callback();
  }
}

async function processReport() {
  await pipeline(
    fs.createReadStream('sales.csv'),
    new CsvToJson(),       // Reusing our earlier transform
    new FilterRows(),
    new FormatOutput(),
    fs.createWriteStream('high-sales-report.txt')
  );
  console.log('Report generated successfully');
}

processReport().catch(console.error);

Backpressure: The Thing Nobody Explains Well

Backpressure is what keeps a fast producer from overwhelming a slow consumer. If your readable stream can produce data at 1GB/s but the writable stream can only flush 100MB/s to disk, something has to give. Without backpressure, the internal buffers grow until your process runs out of memory and crashes.

Here's how it actually works, step by step:

  1. The writable stream has an internal buffer. highWaterMark sets the threshold.
  2. When you call .write() and the buffer exceeds the threshold, .write() returns false.
  3. The readable stream sees this and pauses -- it stops pushing data.
  4. The writable stream works through its buffer. Once it's back under the threshold, it emits 'drain'.
  5. The readable stream resumes, and data flows again.

When you use pipeline(), this whole dance happens automatically. You don't touch it. That's one of the main reasons to use pipeline() over manually writing to streams.

But if you're doing manual writes -- like that drain loop from earlier -- you're responsible for respecting backpressure yourself. Ignore it and your memory usage graph looks like a hockey stick right before the OOM killer shows up.

A good way to sanity-check your stream code: run it against a large file and watch process.memoryUsage().heapUsed. If it stays roughly flat, backpressure is working. If it climbs linearly with the input size, something's wrong. Usually it means you're pushing faster than the consumer can handle and not waiting for drain.

Async Iterators: The Modern Way to Read Streams

Since Node 10, readable streams are async iterables. That means you can use for await...of instead of event listeners. This is genuinely nicer for a lot of use cases:

const fs = require('fs');
const readline = require('readline');

async function processLogFile(filePath) {
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  let errorCount = 0;
  let lineNumber = 0;

  for await (const line of rl) {
    lineNumber++;
    if (line.includes('ERROR')) {
      errorCount++;
      console.log(`Line ${lineNumber}: ${line}`);
    }
  }

  console.log(`Found ${errorCount} errors in ${lineNumber} lines`);
}

processLogFile('app.log').catch(console.error);

The for await...of loop handles backpressure automatically. When your processing is slower than the read speed, the loop pauses the stream. No manual drain handling, no event listener spaghetti. I switched a log parser at work from event-based to async iteration and cut the code by about 40 percent.

One caveat: error handling with async iterators is different. If the stream errors, the for await...of loop throws. You need a try/catch around it, which is arguably cleaner than attaching an error listener but easy to forget. I had a production script that silently exited because an unhandled rejection from a stream error wasn't being caught -- Node just printed a warning and moved on. Always wrap your async iteration in try/catch.

You can also consume transform streams this way, which makes data processing pipelines read almost like synchronous code:

const { Readable } = require('stream');

async function sumLargeDataset(readable) {
  let total = 0;
  for await (const record of readable) {
    total += record.value;
  }
  return total;
}

Debugging Streams in Practice

I want to mention a few things that have cost me real debugging time, because they're the kind of stuff that doesn't show up in tutorials.

First: the "stream already read" problem. If you pipe a readable stream to two writable streams, only one of them gets the data. Streams are consumed once. If you need to send the same data to two destinations, you need to use something like PassThrough streams or manually tee the data. I once had a file upload handler that was supposed to both save to disk and compute a hash. The hash came back wrong because the second pipe got an empty stream. The fix was to create two PassThrough streams and pipe the source to both:

const { PassThrough } = require('stream');
const crypto = require('crypto');
const fs = require('fs');

function saveAndHash(inputStream, outputPath) {
  const pass1 = new PassThrough();
  const pass2 = new PassThrough();

  inputStream.pipe(pass1);
  inputStream.pipe(pass2);

  // Save to disk
  pass1.pipe(fs.createWriteStream(outputPath));

  // Compute hash
  const hash = crypto.createHash('sha256');
  pass2.pipe(hash);

  return new Promise((resolve, reject) => {
    hash.on('finish', () => resolve(hash.digest('hex')));
    hash.on('error', reject);
  });
}

Second: watch out for highWaterMark mismatches in object mode. In buffer mode, highWaterMark is measured in bytes (default 16KB). In object mode, it's measured in number of objects (default 16). If your transform stream reads buffers and outputs objects, these two different units interact in confusing ways. I've seen pipelines where setting highWaterMark: 1 on the transform stream fixed a mysterious memory issue -- the default of 16 objects was buffering too much when each object was a multi-megabyte parsed JSON document.

Third: destroyed streams. If a stream errors partway through, downstream streams might not get their end event. They just hang. This is one more reason to use pipeline() -- it destroys the whole chain on error. But if you're doing manual piping for some reason, always listen for both 'error' and 'close' events on every stream in the chain.

Streams aren't pretty. The API has layers of history baked in -- old-style streams, new-style streams, object mode vs buffer mode, flowing vs paused. But for handling data that doesn't fit in memory, there's no alternative in Node. Get comfortable with pipeline(), learn the transform stream pattern, respect backpressure, and you'll be fine for most real-world cases.

Written by Anurag Kumar

Full-stack developer passionate about Node.js and building fast, scalable web applications. Writing about what I learn every day.

Comments (0)

No comments yet. Be the first to share your thoughts!