Streams are one of the critical peices of Node that make it so powerful. When working with large amounts of data, this should be your go-to solution. Streams are collections of data that might not be available all at once and dont have to fit in memory. With streams, data is read and written in chunks. This is why all of the data doesn’t all have to be in memory at once.
If you think about streaming services like Netflix or Spotify for example, you never have to download the entire video or playlist before watching or listening. Instead, the browser receives this data in a continuous flow of chunks, allowing the file to download or “buffer” as the file plays.
The response
object is actually a writable stream. If we have a big file as a readable stream, we can pipe one into the other and avoid filling up the memory. The fs module can give us a readable stream for any file using the createReadStream method, then we simply pipe this readable stream into the response writable stream. This avoids buffering in memory.
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const file = fs.createReadStream('./big-file.txt'); file.pipe(res); }); server.listen(8000);
There are 4 Fundamental Stream Types:
Readable Streams
fs.createReadStream
)Writable Streams
fs.createWriteStream
)Duplex Streams
Transform Streams
zlib.createGzip
)All streams are event emitters. This means that we can listen for events like data
, end
, error
, close
, pipe
, unpipe
, finish
, drain
, pause
, and resume
.
For readable streams, the important events are data
and end
. The data event is emitted when there is data to be read, and the end event is emitted when there is no more data to be read. For writable streams, the important events are drain
and finish
. The drain event is emitted when the writable stream is ready to accept more data, and the finish event is emitted when the writable stream has finished writing all the data it has received and all data has been flushed.
Good things to know about Readable Streams:
Simple Writable Stream:
const { Writable } = require('stream'); const ws = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } });
We can then consume this stream by piping it into a readable stream by adding process.stdin.pipe(ws);
to the end of the script.
Simple Readable Stream:
const { Readable } = require('stream'); const rs = new Readable({ read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); rs.currentCharCode = 65; rs.pipe(process.stdout);
In the past, you had to be careful not to mix async functions with EventEmitter because there was no way to catch a regection when it was emitted within an event handler. The solution was always to wrap the async function in a try/catch block. Instead, you can set EventEmitter.captureRejections = true
and a catch()
handler will be added every time a Promise is returned from an event handler. If a rejection occurs, the 'error'
event will be emitted, avoiding the 'unhandledRejection'
.
When data must be processed in multiple steps, streams can be connected to each other, sending the data through a “pipeline” of transformations. With piping, input is receved from a readable stream and each step is completed and sent on to the next step via a transform stream. For the last step, we can write the data from the most recent readable stream in a writable stream, or process the data from the most recent readable stream by some other means.
When piping a readable stream into a writable stream, the readable stream will emit the
data
event when it has data to be read. The writable stream will then emit thedrain
event when it is ready to accept more data.
Perhaps a more elegant method of working with streams in a transform pipeline is through asynchronous iteration. This will retrieve the contents of a data container asynchronously, which means the current task may be paused before retrieving an item. The iteration should be done on the readable
event.
const fs = require('fs'); async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream('./big-file.txt', {encoding: 'utf8'}); logChunks(readable);
The idea behind asynchronous iteration is that we can pause the iteration while we’re working on the next item. It is a more elegant alternative to transform streams for processing streamed data in multiple steps. There are a few characterstics of asynchronous iteration:
Readable.from()
which can then be piped into a writable stream later, or we can use an async function to process it.import {Readable} from 'stream'; async function* chunksToLines(chunkIterable) { let previous = ''; for await (const chunk of chunkIterable) { let startSearch = previous.length; previous += chunk; while (true) { const eolIndex = previous.indexOf('\n', startSearch); if (eolIndex < 0) break; // line includes the EOL const line = previous.slice(0, eolIndex+1); yield line; previous = previous.slice(eolIndex+1); startSearch = 0; } } if (previous.length > 0) { yield previous; } } async function* numberLines(lineIterable) { let lineNumber = 1; for await (const line of lineIterable) { yield lineNumber + ' ' + line; lineNumber++; } } async function logLines(lineIterable) { for await (const line of lineIterable) { console.log(line); } } const chunks = Readable.from( 'Text with\nmultiple\nlines.\n', {encoding: 'utf8'}); logLines(numberLines(chunksToLines(chunks)));
In this example, chunkIterable
is an async or sync iterable over chunks of data. The chunksToLines
function returns an async iterable over “lines”. lineIterable
is an async iterable over lines of data. logLines
is an async function that logs the lines.
There are three primary approaches to writing to a writable stream:
write()
method.pipe()
method to pipe a readable stream into the writable stream.pipeline()
from the stream
module to pipe a readable stream into a writable stream.We can use the scenario of writing a stream to file to demonstrate the three approaches.
First, we’ll use an async function to write directly to a writable stream.
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // convert to a promise async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // if we can't write, pause the stream await once(writable, 'drain'); // wait for drain event to resume writing } } writable.end(); await finished(writable); // wait for the stream to finish } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt');
Notice that we promisified stream.finished
. Normally stream.finished()
uses a callback, but we can use this util function to convert it to a Promise. Next, take a look at the if condition. By calling await once(writable, 'drain')
, we are waiting for the drain event to resume writing. We then close the writable stream and wait until writing is done.
Using pipeline(readable, writable)
:
import * as stream from 'stream'; import * as fs from 'fs'; const pipeline = util.promisify(stream.pipeline); async function writeIterableToFile(iterable, filePath) { const readable = stream.Readable.from( iterable, {encoding: 'utf8'}); const writable = fs.createWriteStream(filePath); await pipeline(readable, writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); // ···
This method uses Readable.from() to create a readable stream from the iterable. Then, we use pipeline() to pipe the readable stream into the writable stream. We can also use the Readable.pipe()
method to pipe a readable stream into a writable stream, but the only issue with this method is that if the readable stream emits an error, the writable stream will not close automatically.