Stream

When moving a lot of data out and into memory, it's rare for the entire chunk of data to be moved in a single piece. When reading data from a file on disk, or when retrieving data from a remote web service, it's common to retrieve one chunk of data at a time and then reassemble those chunks when we have all the pieces. In most cases, this process is handled for you automaticly.

However, it can be useful to operate on these chunks as they come in. If you're reading a compressed file from disk, decompressing the chunks as they come in allows you utilize the pause between receiving them, increasing the percieved performance of the operation. It also allows you to save memory by not having to store the entire compressed file in memory before beginning the decompression process.

Streams is the abstraction that allows us to work on data that is in transit. They can also serve as a tool for communication between different parts of your code base.

We have three kinds of streams: readable streams, writable streams, and transformation streams.

Readable Streams

type Readable value

A source of data. You can only read data out of a Readable stream, not write data into it.

fromArray : Array a -> Task Error (Readable a)

Create a Readable stream that delivers the values in the provided Array before closing.

read : Readable value -> Task Error value

Read a value off the stream. The Task will not succeed until a value can be read.

readBytesAsString : Readable Bytes -> Task Error (Maybe String)

Reads Bytes off the stream and attempt to convert it into String.

cancelReadable : String -> Readable value -> Task Error {}

Cancels the stream. This indicates a fatal error, and the given String should explain in a human-readable way what that error is. If the stream contains a buffer, the buffer is dropped. It will not be possible to read another value out of this stream.

Writable Streams

type Writable value

A destination for data. You can only write data into a Writable stream, not read data out of it.

write : value -> Writable value -> Task Error (Writable value)

Write a value into the stream. The returned Task will only succeed when the written value is accepted, meaning it is read from a Readable stream, or stored in some stream buffer.

writeStringAsBytes : String -> Writable Bytes -> Task Error (Writable Bytes)

Converts the given String to Bytes and writes it to the stream.

writeLineAsBytes : String -> Writable Bytes -> Task Error (Writable Bytes)

Same as writeStringAsBytes except a newline character is appended to the String before conversion.

enqueue : value -> Writable value -> Task Error (Writable value)

Queue a value to be written into the stream. The returned Task will succeed when the value is on the stream's buffer.

The difference between this and write is when the Task succeeds. Because the Task from this function succeeds once the value is in a buffer, we won't be able to detect if the stream is cancelled before the value is passed on to somewhere else. On the other hand, we can assume the Task succeeds as long as there is room in the buffer, even if someone isn't actively reading from the stream.

In general you should prefer write, and reach for this function if you experience problems.

closeWritable : Writable value -> Task Error {}

Closes the stream. This indicates that no new values will be added to the stream after this point.

cancelWritable : String -> Writable value -> Task Error {}

Cancels the stream. This indicates a fatal error, and the given String should explain in a human-readable way what that error is. If the stream contains a buffer, the buffer is dropped.

Error Handling

type Error
= Closed
| Cancelled String
| Locked

Different kind of errors that can happen when operating on a stream.

  • Closed: The stream never accept/produce another value.
  • Cancelled: The stream has been terminated, possibly because something went wrong. The associated String contains a human readable error message.
  • Locked: The stream is already being read or written to. You might have to retry the operation.
errorToString : Error -> String

Give a human readable description of an error.

Transformation Streams

type Transformation read write

A readable-writable stream pair. Whatever is written to the writable stream, can be retrieved from the readable stream. After data is written, and before it is placed on the readable stream, it goes through a transformation function. This function can alter the data, or even drop it entirely.

identityTransformation : Task x (Transformation data data)

A Transformation that doesn't actually transform the data written to the writable stream. This can be useful as a communication primitive. You can pass the on the readable stream, allowing a one-way communication to some other part of your codebase.

identityTransformationWithOptions :
{ readCapacity : Int
, writeCapacity : Int
}
-> Task x (Transformation data data)

Same as identityTransformation, but allows you set the capacity of the streams. The capacity decides how many chunks a stream will store in its buffer. When a buffer is full, the stream will stop accepting new chunks until it has room.

If you attempt to write to a Writable stream with a full buffer, the write will only succeed when there's room in the buffer again. If the stream has a capacity of 0, the write will only succeed when the value has been read on the other end.

Attempting to read from a Readable stream with a full buffer will succeed instantly. If the buffer is empty or capacity is 0, the read will succeed once there's a value to be read.

type CustomTransformationAction state value
= UpdateState state
| Send ({ state : state, send : Array value })
| Close (Array value)
| Cancel String

When defining a custom Transformation, you need to specify how the data coming in is handled.

  • UpdateState: Update the internal state of the Transformation, no values are passed to the readable stream.
  • Send: Update the internal state and make chunks available for reading.
  • Close: Make the given chunks available for reading, and close the streams.
  • Cancel: Cancel both streams with a human-readable error message.
customTransformation :
(state -> input -> CustomTransformationAction state output)
-> state
-> Task x (Transformation input output)

Create your very own Transformation. The stream-pair holds state and is free to alter, batch, combine or even drop whatever data is coming in.

customTransformationWithOptions :
(state -> input -> CustomTransformationAction state output)
-> { initialState : state, readCapacity : Int , writeCapacity : Int }
-> Task x (Transformation input output)

Same as customTransformation, except you can define the capacity for each stream.

readable : Transformation read write -> Readable read

Retrieve the Readable stream of a Transformation.

writable : Transformation read write -> Writable write

Retrieve the Writable stream of a Transformation.

pipeThrough :
Transformation input output
-> Readable input
-> Task Error (Readable output)

When data becomes available on a Readable stream, immediatly write that data to the Transformation. This will lock both streams, and closing one will close the other.

On success, the Readable stream of the Transformation is returned.

awaitAndPipeThrough :
Task Error (Transformation input output)
-> Readable input
-> Task Error (Readable output)

Same as pipeThrough, except the Transformation is resolved from a Task.

pipeTo : Writable data -> Readable data -> Task Error {}

When data becomes available on a Readable stream, immediatly write that data to the Writable stream. This will lock both streams, and closing one will close the other. The Task will resolve once the Writable stream is closed.

Useful Transformation Streams

textEncoder : Task x (Transformation String Bytes)

Transforms String to Bytes

textDecoder : Task x (Transformation Bytes String)

Transforms Bytes to String. If the conversion fails, the streams will be cancelled.

gzipCompression : Task x (Transformation Bytes Bytes)

Compress Bytes using the gzip algorithm.

deflateCompression : Task x (Transformation Bytes Bytes)

Compress Bytes using the deflate algorithm.

deflateRawCompression : Task x (Transformation Bytes Bytes)

Compress Bytes using the deflate algorithm, without leading headers.

gzipDecompression : Task x (Transformation Bytes Bytes)

Decompress Bytes using the gzip algorithm.

deflateDecompression : Task x (Transformation Bytes Bytes)

Decompress Bytes using the deflate algorithm.

deflateRawDecompression : Task x (Transformation Bytes Bytes)

Decompress Bytes using the deflate algorithm, without leading headers.