HomeAbout Me

Node.js Streams

By Dan Orlando
Published in Node.js
November 16, 2021
4 min read

Streams are one of the critical peices of Node that make it so powerful. When working with large amounts of data, this should be your go-to solution. Streams are collections of data that might not be available all at once and dont have to fit in memory. With streams, data is read and written in chunks. This is why all of the data doesn’t all have to be in memory at once.

If you think about streaming services like Netflix or Spotify for example, you never have to download the entire video or playlist before watching or listening. Instead, the browser receives this data in a continuous flow of chunks, allowing the file to download or “buffer” as the file plays.

The response object is actually a writable stream. If we have a big file as a readable stream, we can pipe one into the other and avoid filling up the memory. The fs module can give us a readable stream for any file using the createReadStream method, then we simply pipe this readable stream into the response writable stream. This avoids buffering in memory.

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const file = fs.createReadStream('./big-file.txt');
  file.pipe(res);
});
server.listen(8000);

There are 4 Fundamental Stream Types:

  • Readable Streams

    • This is an abstraction for a source from which data can be consumed (eg. fs.createReadStream)
  • Writable Streams

    • This is an abstraction for a destination to which data can be written (eg. fs.createWriteStream)
  • Duplex Streams

    • These streams are both readable and writable
  • Transform Streams

    • These are duplex streams that can modify or transform the data as it is read and written (eg. zlib.createGzip)
    • As a writable stream, it receives pieces of data, transforms (changes or discards) them and then outputs them as a readable stream.
    • Sometimes referred to as a “through” stream

    All streams are event emitters. This means that we can listen for events like data, end, error, close, pipe, unpipe, finish, drain, pause, and resume.

    For readable streams, the important events are data and end. The data event is emitted when there is data to be read, and the end event is emitted when there is no more data to be read. For writable streams, the important events are drain and finish. The drain event is emitted when the writable stream is ready to accept more data, and the finish event is emitted when the writable stream has finished writing all the data it has received and all data has been flushed.

Events and Functions on Readable and Writable Streams
Events and Functions on Readable and Writable Streams

Good things to know about Readable Streams:

  • 2 modes: Paused and Flowing (also referred to as pull vs. push)
  • All readable streams start in paused mode
  • Can be switched into flowing and back to paused where needed
  • In paused mode, use stream.read() to read from the stream
  • In flowing mode, must use events to consume the data
  • In flowing mode, data can be lost if no consumers are available to handle it, must use data event handler
  • Adding a data event handler switches a paused stream into flowing mode
  • Removing the data handler switches it back to paused mode
  • Usually, to switch between these two modes, we use the resume and pause methods

Simple Writable Stream:

const { Writable } = require('stream');
const ws = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

We can then consume this stream by piping it into a readable stream by adding process.stdin.pipe(ws); to the end of the script.

Simple Readable Stream:

const { Readable } = require('stream');
const rs = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

rs.currentCharCode = 65;
rs.pipe(process.stdout);

In the past, you had to be careful not to mix async functions with EventEmitter because there was no way to catch a regection when it was emitted within an event handler. The solution was always to wrap the async function in a try/catch block. Instead, you can set EventEmitter.captureRejections = true and a catch() handler will be added every time a Promise is returned from an event handler. If a rejection occurs, the 'error' event will be emitted, avoiding the 'unhandledRejection'.

Piping Streams

When data must be processed in multiple steps, streams can be connected to each other, sending the data through a “pipeline” of transformations. With piping, input is receved from a readable stream and each step is completed and sent on to the next step via a transform stream. For the last step, we can write the data from the most recent readable stream in a writable stream, or process the data from the most recent readable stream by some other means.

When piping a readable stream into a writable stream, the readable stream will emit the data event when it has data to be read. The writable stream will then emit the drain event when it is ready to accept more data.

Perhaps a more elegant method of working with streams in a transform pipeline is through asynchronous iteration. This will retrieve the contents of a data container asynchronously, which means the current task may be paused before retrieving an item. The iteration should be done on the readable event.

const fs = require('fs');
async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}
const readable = fs.createReadStream('./big-file.txt', {encoding: 'utf8'});
logChunks(readable);

The idea behind asynchronous iteration is that we can pause the iteration while we’re working on the next item. It is a more elegant alternative to transform streams for processing streamed data in multiple steps. There are a few characterstics of asynchronous iteration:

  1. The input is a readable stream
  2. The first transformation is done by an async generator function which iterates over the input stream and yields chunks of data.
  3. The second transformation is done by a async function which consumes the yielded chunks. We can continue to transform futher by using more async generators.
  4. At the end of the transformation pipeline, we have options for handling the async iterable that is returned by the final generator: we can convert it to a readable stream with Readable.from() which can then be piped into a writable stream later, or we can use an async function to process it.
import {Readable} from 'stream';

async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));

In this example, chunkIterableis an async or sync iterable over chunks of data. The chunksToLines function returns an async iterable over “lines”. lineIterable is an async iterable over lines of data. logLines is an async function that logs the lines.

Writing to Writable Streams

There are three primary approaches to writing to a writable stream:

  1. Write directly to the stream via the write() method.
  2. Use the pipe() method to pipe a readable stream into the writable stream.
  3. Use pipeline() from the stream module to pipe a readable stream into a writable stream.

We can use the scenario of writing a stream to file to demonstrate the three approaches.

First, we’ll use an async function to write directly to a writable stream.

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // convert to a promise

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) {  // if we can't write, pause the stream
      await once(writable, 'drain'); // wait for drain event to resume writing
    }
  }
  writable.end(); 
  await finished(writable); // wait for the stream to finish
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');

Notice that we promisified stream.finished. Normally stream.finished() uses a callback, but we can use this util function to convert it to a Promise. Next, take a look at the if condition. By calling await once(writable, 'drain'), we are waiting for the drain event to resume writing. We then close the writable stream and wait until writing is done.

Using pipeline(readable, writable):

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); 
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

This method uses Readable.from() to create a readable stream from the iterable. Then, we use pipeline() to pipe the readable stream into the writable stream. We can also use the Readable.pipe() method to pipe a readable stream into a writable stream, but the only issue with this method is that if the readable stream emits an error, the writable stream will not close automatically.


Tags

Node.jsbeginner
Previous Article
Client-Server Communication with Node.js
Dan Orlando

Dan Orlando

Web Developer

Topics

AI & LLMs
Algorithms
Node.js
React
Svelte
Testing

Related Posts

Create a Networking Server with Node.js
July 15, 2022
3 min
© 2023, All Rights Reserved.

Quick Links

PortfolioAbout

Social Media