Stream
When moving a lot of data out and into memory, it's rare for the entire chunk of data to be moved in a single piece. When reading data from a file on disk, or when retrieving data from a remote web service, it's common to retrieve one chunk of data at a time and then reassemble those chunks when we have all the pieces. In most cases, this process is handled for you automaticly.
However, it can be useful to operate on these chunks as they come in. If you're reading a compressed file from disk, decompressing the chunks as they come in allows you utilize the pause between receiving them, increasing the percieved performance of the operation. It also allows you to save memory by not having to store the entire compressed file in memory before beginning the decompression process.
Streams is the abstraction that allows us to work on data that is in transit. They can also serve as a tool for communication between different parts of your code base.
We have three kinds of streams: readable streams, writable streams, and transformation streams.
Readable Streams
A source of data. You can only read data out of a Readable stream, not write data into it.
Create a Readable stream that delivers the values in the provided Array
before
closing.
Read a value off the stream. The Task
will not succeed until a value can be read.
Reads Bytes
off the stream and attempt to convert it into String
.
Cancels the stream. This indicates a fatal error, and the given String
should explain in a human-readable
way what that error is. If the stream contains a buffer, the buffer is dropped. It will not be possible to read
another value out of this stream.
Writable Streams
A destination for data. You can only write data into a Writable stream, not read data out of it.
Write a value into the stream. The returned Task
will only succeed when the written value is accepted, meaning
it is read from a Readable stream, or stored in some stream buffer.
Converts the given String
to Bytes
and writes it to the stream.
Same as writeStringAsBytes except a newline character is appended to the String
before conversion.
Queue a value to be written into the stream. The returned Task
will succeed when the value is on the stream's buffer.
The difference between this and write is when the Task
succeeds. Because the Task
from this function
succeeds once the value is in a buffer, we won't be able to detect if the stream is cancelled before the value is
passed on to somewhere else. On the other hand, we can assume the Task
succeeds as long as there is room in the
buffer, even if someone isn't actively reading from the stream.
In general you should prefer write, and reach for this function if you experience problems.
Closes the stream. This indicates that no new values will be added to the stream after this point.
Cancels the stream. This indicates a fatal error, and the given String
should explain in a human-readable
way what that error is. If the stream contains a buffer, the buffer is dropped.
Error Handling
Different kind of errors that can happen when operating on a stream.
- Closed: The stream never accept/produce another value.
- Cancelled: The stream has been terminated, possibly because something went wrong. The associated
String
contains a human readable error message. - Locked: The stream is already being read or written to. You might have to retry the operation.
Give a human readable description of an error.
Transformation Streams
A readable-writable stream pair. Whatever is written to the writable stream, can be retrieved from the readable stream. After data is written, and before it is placed on the readable stream, it goes through a transformation function. This function can alter the data, or even drop it entirely.
A Transformation that doesn't actually transform the data written to the writable stream. This can be useful as a communication primitive. You can pass the on the readable stream, allowing a one-way communication to some other part of your codebase.
Same as identityTransformation, but allows you set the capacity of the streams. The capacity decides how many chunks a stream will store in its buffer. When a buffer is full, the stream will stop accepting new chunks until it has room.
If you attempt to write to a Writable stream with a full buffer, the write will only succeed when there's room in the buffer again. If the stream has a capacity of 0, the write will only succeed when the value has been read on the other end.
Attempting to read from a Readable stream with a full buffer will succeed instantly. If the buffer is empty or capacity is 0, the read will succeed once there's a value to be read.
When defining a custom Transformation, you need to specify how the data coming in is handled.
- UpdateState: Update the internal state of the Transformation, no values are passed to the readable stream.
- Send: Update the internal state and make chunks available for reading.
- Close: Make the given chunks available for reading, and close the streams.
- Cancel: Cancel both streams with a human-readable error message.
Create your very own Transformation. The stream-pair holds state and is free to alter, batch, combine or even drop whatever data is coming in.
Same as customTransformation, except you can define the capacity for each stream.
Retrieve the Readable stream of a Transformation.
Retrieve the Writable stream of a Transformation.
When data becomes available on a Readable stream, immediatly write that data to the Transformation. This will lock both streams, and closing one will close the other.
On success, the Readable stream of the Transformation is returned.
Same as pipeThrough, except the Transformation is resolved from a
Task
.
When data becomes available on a Readable stream, immediatly write that data to
the Writable stream. This will lock both streams, and closing one will close
the other. The Task
will resolve once the Writable stream is closed.
Useful Transformation Streams
Transforms String
to Bytes
Transforms Bytes
to String
. If the conversion fails, the streams will be cancelled.
Compress Bytes
using the gzip
algorithm.
Compress Bytes
using the deflate
algorithm.
Compress Bytes
using the deflate
algorithm, without leading headers.
Decompress Bytes
using the gzip
algorithm.
Decompress Bytes
using the deflate
algorithm.
Decompress Bytes
using the deflate
algorithm, without leading headers.