In our first blog post about streaming, we discussed how streaming libraries strengthen types to catch more errors. However, when we strengthen types, we need to be careful to not hinder program composition. After all, precise type information can make it more onerous to convince the compiler that a particular program is well formed. In this post, we show that streaming libraries handle this issue well, as they allow programs to be composed conveniently. To illustrate this point, we will discuss the most important composition forms for streaming programs and illustrate the explained concepts at the example of four concrete streaming libraries, namely pipes, conduit, streaming, and io-stream. This ensures that the discussed concepts are of a general nature and not specific to just one particular library.

Composition forms

Generally speaking, streaming libraries offer two flavors of composition: appending and pipelining. When appending, the values produced by one program are being followed by the values produced by another. When pipelining, some values produced by one streaming program are processed by another while the first program still has more values to produce.

Different libraries implement these forms of composition in different ways. Some streaming libraries have a first-class notion of stream processors, not streams. Generally, a stream processor SP i o m r gets a stream of values of type i, yields a stream of values of type o, and finally yields a value of type r when it terminates. The parameter m represents the monad in which the effects of the processors are sequenced. For example, we have,

conduit:    ConduitM i o m r
pipes:      Pipe i o m r

pipes and conduit define the monad bind operation (>>=) as an appending composition. In p >>= \r -> q, the input stream is first passed to p, and when p completes the unconsumed elements from the input stream are passed to q. As a result, the output stream of p is followed by the output stream of q.

In contrast to pipes and conduit, the streaming and io-stream packages manipulate streams directly. Consequently, there is no notion of an input stream. Still, monadic composition works in a similar manner in the case of the streaming package. The stream p >>= \r -> q starts with the output stream of values produced by p, followed by the output stream of values produced by q. The involved types are

streaming:  Stream (Of a) m r
io-streams: InputStream a

Values of type Stream (Of a) m r are streams of values of type a, which might be produced by performing effectful computations in the monad m. When there are no more elements of type a, a value of type r is produced. Values of type InputStream a are also streams of values of type a, and values might be produced by performing computations in the monad IO.

Although the package io-stream does not define a monad instance (and hence, does not overload (>>=)) for InputStream, it provides a separate function for the purpose of concatenating streams:

appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a)

The second form of composition, namely pipelining, is realised by the following two combinators in the libraries based on stream processors:

conduit:    (.|)  :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
pipes:      (>->) :: Monad m => Pipe a b m r      -> Pipe b c m r     -> Pipe a c m r

In p .| q, the output stream of p is fed into q as its input stream and so it happens with p >-> q as well. But there is a difference in the types of the left operand. In conduit, the left operand yields () when it terminates. The right operand is free to continue producing values in the output stream for as long as it wants. Not so in pipes, where the termination of the left operand produces a value of type r and causes the composition to terminate and yield the same value.

In streaming and io-streams, pipelines are constructed by composing functions which transform streams. For instance

streaming:  map        :: (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
io-streams: decompress :: InputStream ByteString -> IO (InputStream ByteString)

The function decompress takes an input stream of bytestrings carrying compressed data in the zlib format and outputs a stream of bytestrings carrying the result of decompression.

Leftovers and parsing

Most streaming libraries offer the ability to push back a value from the input. The returned inputs are sometimes called leftovers. This facilitates looking ahead in the input stream to peek at some elements needed to decide on alternative behaviors of the program.

To illustrate the need for looking ahead, consider the following conduit program, which acts as a parser. It yields the number of 'a' characters in the input followed by the number of 'b' characters.

abConduit :: Monad m => Consumer Char m (Int, Int)
abConduit =
    (,) <$> (takeWhileC (== 'a') .| lengthC)
        <*> (takeWhileC (== 'b') .| lengthC)

As conduit implements leftovers, the first takeWhileC can return an element to the input stream upon discovering that it isn't an 'a' character. Then the following takeWhileC has the opportunity to examine that same element again.

However, it is important to note that pipelining composition only allows the left operand to put values back. That is, in p .| q, only p can return values to the input stream; q's leftovers are ignored. This is a constraint stemming from the type of the composition:

(.|) :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r

The input stream provides values of type a, but the leftovers of the right operand are of type b. To address this, conduit offers the following composition operator that resolves the type mismatch with a conversion function.

fuseLeftovers :: Monad m => ([b] -> [a]) -> ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r

In contrast to conduit, the pipes package does not implement leftovers, but there is an additional package pipes-parse, which defines Parsers in terms of the core abstraction of Pipes. Parsers in turn use lenses of the input to return values back. This is what an implementation with pipes-parse looks like for our example.

abPipes :: Monad m => Pipes.Parse.Parser Text m (Int, Int)
abPipes =
   (,) <$> zoom (Pipes.Text.span (== 'a')) lengthP
       <*> zoom (Pipes.Text.span (== 'b')) lengthP

lengthP :: (Monad m, Num n) => Pipes.Parse.Parser Text m n
lengthP =
    Pipes.Parse.foldAll (\n txt -> n + fromIntegral (Text.length txt)) 0 id

The function zoom is a primitive offered by packages implementing lenses. It changes the state of a stateful computation in a given scope, and a lens explains how the state is changed.

zoom :: Monad m => Lens' a b -> StateT b m c -> StateT a m c

In the case of zoom (Pipes.Text.span (== 'a')) p, it changes the input fed to the parser p by only giving the greatest prefix containing 'a's. The rest of the input, even the first character failing the predicate, is left for the next parser.

The parser lengthP is defined to count the amount of characters in the input. Thus

zoom (Pipes.Text.span (== 'a')) lengthP :: Parser Text m Int

stands for the same computation as

takeWhileC (== 'a') .| lengthC :: Consumer Char m Int

Turning our attention to the packages directly manipulating streams, Streams from streaming provide a function cons to push values back into a stream. The package io-streams provides unRead, for the same purpose. While the cons function just constructs a new stream, the function unRead modifies the input stream. We have,

cons   :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r
unRead :: a -> InputStream a -> IO ()

More generally, you may wonder, whether streaming libraries should try to do the job of parsers, for which the Haskell ecosystem already provides plenty of alternatives? To answer this question, consider that parsing libraries like parsec or attoparsec do not place any bounds on their memory consumption. In particular, the parsers of neither library return a result value before the entire input has been parsed. For instance, if a parser is parsing a list, it can't yield the first elements of the list as they are discovered. Instead, the list is handed back to the caller only when all of it resides in memory. The attoparsec package is even more problematic, as it retains all of the input, no matter what the parser does.

More sophisticated parsing libraries can do better; e.g., uu-parsinglib. This package, however, returns the parser results lazily and so it is a challenge to test whether specific parts of the result are available without blocking the program. Overall, this means that to parse unbounded streams of input in a bounded amount of memory, we do actually require the streaming libraries to provides the discussed parser functionality. After all, they provide a crucial resource usage guarantee that we cannot obtain from conventional parser libraries.

Folds and non-linear uses of streams

Finally, we need to discuss reductions, where we consume a stream to compute a non-stream result. For example, consider the following naive and incorrect attempt at computing the average of a stream of values:

average0 :: Stream (Of Double) IO () -> IO Double
average0 xs = (/) <$> Streaming.Prelude.sum_ xs
                  <*> (fromIntegral <$> Streaming.Prelude.length_ xs)

The problem with this program is that the same stream is traversed twice. Even if the stream is not effectful, this program does not run in bounded memory. If the stream is, however, effectful, the situation is even worse, as each traversal produces different results leading to an inconsistent computation. To fix these problems, we need to compute the length and the average in a single pass.

average1 :: Stream (Of Double) IO () -> IO Double
average1 xs = uncurry (/) <$>
    Streaming.Prelude.fold_ (\(!s, !c) d -> (s + 1, c + 1)) (0, 0) id xs

With conduit and pipes both the mistake as well as the required fix are similar. Thus, streaming libraries seem to require that the programmer takes care to only consume streaming sources in linear manner, that is, at most once in a program.

Interestingly, conduit offers an alternative solution, where existing stream processors can be reused and combined using ZipSinks instead of explicitly using a fold. A ZipSink is a combination of stream processors, all of which are fed the same input stream, and a final result is produced from the outputs of all the output streams. This results in the following solution to our example, which is quite close to the original naive attempt to solving the problem:

average2 :: Monad m => Consumer Double m Double
average2 = toConsumer $
    getZipSink ((/) <$> ZipSink sumC <*> fmap fromIntegral (ZipSink lengthC))

Facilities, such as this, are one of advantages of libraries based on stream processors as opposed to those manipulating streams directly. ZipSinks are an incarnation of the ideas in the foldl package for stream processors, while the ecosystem of pipes offers the pipes-transduce package.

Summary

While strengthening types to catch more errors, streaming libraries are still sufficiently expressive to allow for many data flow patterns of streaming programs. The basic compositional forms are complemented by additional concepts to cover even more patterns. Leftovers, for instance, support parsing challenges that cannot be easily delegated to standard parsing libraries while still keeping an upper bound on the consumed memory. In turn, ZipSinks simplify the sharing of a single input stream by multiple stream processors.