Stream in Node JS by Gautam Kumar Samal on October 22, 2020 185 views
Want to discuss the concepts and usages for a stream in general, and not the full technical aspects of it. The examples and terminologies are inherited from NodeJS. I am sure you’ll find alternative variants in other languages. Although streaming isn’t a new concept, there is quite some confusion when we think about implementing one.
What is a stream? Is it a data type?
Well, no! Some common data types that we are aware of are int, string, char, boolean etc. We also have collections like Array, Object that deal with them. All the data types share a common property that they store data in memory and have a fixed/dynamic size based on that.
The stream, on the other hand, isn’t a data type. It instead can be called as an abstraction on interface for data handling. As the name suggests it can be imagined as flowing of the data.
How does the data flow? What if there is no one at the receiving end?
Take an example of a river or a stream. It flows continuously until the source is exhausted (which we hope shouldn’t happen based on climate change). But the river at any point of time also contains water or think of it as a dam/reservoir holding some amount of water and releasing it as it gets full. Similar to a reservoir the stream has a buffer that holds some data and based on the receiver and consumer, the buffer releases data. You can also imagine if the receiver is slow or doesn’t accept the data and the consumer continues to feed on, the buffer will let go of data that threatens its capacity, similar to a dam.
When do we need to use streams and is it really necessary?
- Let’s start small. I wish to download a file of 10GB.
- I wish to analyse server logs over the past month. The log files also keep redundant data that spreads over 100 GB. But we just want insights, let’s say – most active users based on log.
- I wish to export 20 tables each containing over million records to csv files and compress them into a .zip file. Then I want to upload it to a remote storage.
Right! This seems intriguing now. Let’s talk streams!
How does it really work?
Let’s think of the first problem, to download a file. A person not knowing this or any other shortcut would read the file first. If the file exists he will get the data of the file and then transfer it across the HTTP response to the client or any similar way to another location. In NodeJS, there is an in-build module to handle the files called “fs”. If you look into it, there are 3 things that can help you with this.
- fs.readFileSync(path[, options])
- fs.readFile(path[, options], callback)
- fs.createReadStream(path[, options])
The first option is quite clear. We would read the whole file and the method would return us the data. The second method is callback based. So, we won’t keep the program waiting for the file reading to be finished, instead we’ll receive a callback with the same file data as first when the file read is complete. The third option uses a stream to read. Now thinking of the first two options, even if the second one holds an advantage of being asynchronous, it still returns the whole data of the file. And based on our example case, it would be somewhat near 10 GB. That’s huge. Practically every server or app runs with limited resources and even if you can afford 10GB, it will affect the performance of the other requests into the same server.
That’s where the third option comes in handy because we never need to store the data, we just need to pass them along. Here one would create a stream over a file and register Response or something similar as receiver. Now the stream will start to transfer the data. Remember the stream itself has only events and a buffer with a max size of 16 MB by default. It is configurable but that’s how much it can store at the maximum.
How do we push or pull data?
Stream is all about producing and consuming data. Another benefit would be a stream can be piped to multiple streams and similarly multiple streams can be piped to one, like when we want to compress the file, we would very much like to push multiple file streams into a single zip stream, all without having the actual effort to store and compress data.
As far as consumption or production is concerned, the stream interface in NodeJS extends EventEmitter. It’s self-explanatory that such operations would need to be event based. So, when we push data into a stream, it emits an event “data” and the consumer has to listen to it. The process goes on until the producer ends the stream (one way would be to pass null) and the consumer will get an “end” event notification. If both ends are streams, we can use “pipe” that will do the flow automatically without us needing to listen to those events.
This would explain our second use case. We would want to create multiple streams and listen to their “data” events. Or we can create a new stream and have all the read streams to be linked to a single stream via “pipe”.
Is utility streams like “fs.createReadStream” enough for us?
Unfortunately no! I am sure many of us have used “createReadStream” or “createWriteStream” without having to go through stream features and functionality. That’s because the module abstracts it for a simple use case and what could be so trivial than a file.
So let’s talk about custom streams. The streams come in all forms – Read, Write, Duplex, Transform. At the end the basics would be, we need a readable stream to be linked to a writable stream. The transform stream can come in between to transform the outgoing data as it goes. The duplex streams are bi-directional and thus use two buffers for read and write respectively.
Looking back to our scenario – 3, I think we would need some custom streams, don’t we? We are going to read the data from the database. Let’s assume our database utility doesn’t provide inbuilt stream but provides callback. For each record, we would receive the data under the “data” event. So, to work with that we would create a Read stream that is going to be the source.
Notice the line “this.push(chunk)”. That’s where we are pushing the data to the stream and it’s important to stop/pause the stream if the method returns false. That basically says that the Buffer is full.
Similarly an example of a custom writable stream would be something like the following. Ideally we won’t need it much as we can directly send data to a file stream or HTTP response. We could do whatever we needed before calling the callback. Empty callback would mean the write action was successful.
While all of this sounds great, what should we do about errors? In the practical world an error can occur at any time and some of them are intentional. So, how do we best handle them and where?
Since stream extends EventEmitter, then have an “error” handler by default, which will be emitted when error occurs with the error message and stack. So, we should always listen to “error ” event if we are handling the streams by ourselves.
If the streams are linked by “pipe”, the error event will propagate till the end of the link. But if we handle it ourselves (listening to the “data” event) and push the packets manually, we need to emit errors manually. On the read stream example, something like the following can be configured, in case the source emits any error.
Ideally we would want to destroy the stream in the event of an error and can perform additional actions based on the need. The write stream example shows that the callback can be called with an error in case something isn’t expected. That’s definitely one way to do validation on the write end. As another option, a transform stream can also be used.
It provides an option to transform a chunk into something else as we go. The transform stream can emit error as well.
I think we got most of the things covered. Once you get a grasp of this, the streams provide a nice way to solve a lot of practical problems. The streaming process is completely asynchronous, running with low memory footprint. So, handling errors can not be emphasized enough.
Primarily the stream operates with string or buffer (bytes) but there has been a new addition called “ObjectMode” that allows each chunk to be a JSON object. It’s quite handy when it comes to processing JSON records.
However newer techniques keep coming with every passing day, one better than the next.