Three ways to process Node.js readable streams

Streams are part of the core Node.js and provide a great way to start processing data as soon as a chunk is available. Today I'll discuss three ways to process readable streams and will provide examples for all of them.

Readable streams are one of the stream types - readable, writable, duplex and transform - Node.js offers.

If the reader is not familiar with the concept of streams, I wrote an introductory post on them, and it might be a good idea to read that one first.

Readable streams

When we need to work with large files, we can choose to have the file in one piece, as is, or slice them up to smaller chunks and start processing these chunks once they are available.

Processing larger data at once has some disadvantages.

We have to wait for them to download before we can use them and this might require an extensive use of memory depending on the size of the data. Because our application has to store the data somewhere before they are used, we might run into buffer issues.

On the contrary, when data arrive in small pieces (or chunks), we can start processing them immediately. Data can be transformed or even forwarded to the client and although it still takes time for them to download, it’s much better to process data chunk by chunk than wait for them to accumulate and work with them at once.

Streams are all around

Streams are cool and as I mentioned in the excerpt, they are part of the core Node.js.

Readable streams represent the incoming data which go into our application. It’s the data chunks that need to be processed. Writable streams have pieces of the processed data.

Examples for readable streams are the standard input (the terminal), we have file system streams (fs.createReadStream) but the http requests are also readable streams.

As such, I’ll use both readable and writable streams in the examples below.

Processing readable streams

Let’s see then three ways of processing readable streams. I’ll use the standard input (process.stdin) as a readable stream in each example.

Use pipe

This is the recommended way to process readable streams. It’s easy to use and automatically manages backpressure as well.

When the data build up on the input side because it's read faster than processed, it's called backpressure. It can be a problem because accumulated data must be kept in the memory which has finite capacity.

pipe is easy to use but it provides the least control of the three ways to be discussed.

The syntax for pipe is the following:

source.pipe(destination)

The source can but the destination cannot be a readable stream.

pipe is very similar to the one used in Linux or ReactiveX. We receive a chunk of data and tunnel (pipe) them through some sort of transformation process.

But because an example is worth a hundred (or even a thousand) words, let’s have a look how it works.

Create a file called readable-streams-example1.js in a folder. There’s no need to install anything except Node.js. Write the following few lines in the file:

const fs = require('fs');

const writable = fs.createWriteStream('example1.txt');
const readable = process.stdin;

readable.pipe(writable);

This little piece of code (and the other two as well) takes the input from the console and keeps writing whatever we type into a file called example1.txt, which will be located in the same folder as readable-streams-example1.js.

The readable stream (the source) is the standard input (process.stdin) and it’s piped to writable (the destination), which is in this case a writable stream of the file system (fs.createWriteStream).

To make it work i.e. to switch the stream from paused mode to flowing mode we use pipe.

Let’s now type node readable-streams-example1.js in the terminal. The process starts and we can also start typing.

We can create a chunk of data by pressing Enter. When the readable stream finds a newline character, it forwards the data for processing and makes them a chunk.

You can see this in example1.txt: Words or sentences that you typed after hitting the Enter button will be in a new line. In fact, pressing Enter multiple times will create multiple empty new lines in example1.txt.

This way we have an infinite stream and it will keep going until Ctrl + C is pressed.

Once the process is terminated, the example1.txt file will contain everything we typed. Cool!

Event emitters

Streams inherit from EventEmitters, which means that we can attach event listeners to events readable streams emit.

This reading mode provides more control over the data but it’s easier to lose them, too.

The stream can be switched into flowing mode by listening to the data event.

We can refactor the previous example to use events. In a new file called readable-streams-example2.js we can write the following:

const fs = require('fs');

const writable = fs.createWriteStream('example2.txt');
const readable = process.stdin;

readable.on('data', (chunk) => { // data event is listened to, so the stream is now in flowing mode
  writable.write(chunk);
});

Readable streams (readable in our example) have a method called on, which makes it possible to listen to the events they emits.

One of these events is data and we need to listen to this event if we want to catch chunk (the data we enter). If the event listener is removed, the data will be lost.

We can use the write method on the writable stream to forward our words to example2.txt.

Use the read() method

Readable streams use a read operation when data is processed. Similarly to writable streams, which have a write method, readable streams come with a built-in read method.

read can also be used to start the data flow. This time we need to listen to the readable event.

Let’s see how our example works with read in the readable-stream-example3.js file:

// example 2: alternative way
const fs = require('fs');

const writable = fs.createWriteStream('example3.txt');
const readableStream = process.stdin;

readableStream.on('readable', () => { // data flow is stopped, it will start if read() is called
  let chunk;
  while (null !== (chunk = readableStream.read())) {
    writable.write(chunk);
  }
});

This code is probably the most verbose of all. When read is called, the listener is ready to accept and process data (chunk).

When there’s no more data left to process, readableStream.read() will return null. This is why we can create the while loop: While data are available, process and write them to the example3.txt.

If you run this little script, the result will be the same as in the first two cases.

Use one way

One should choose one of the above processing methods when managing streams. Mixing them up can lead to funky behaviour in the application.

pipe is the easiest method of all to use but it provides the least control over the stream.

data and read() need more code and, in return, one can have more influence on processing the data.

Conclusion

Read streams are excellent way to import large data into the application without waiting them to be entirely read. We can use the data immediately and it will lead to better performance and user experience.

Thanks for reading and see you next time.