In school, we’re taught that I/O is a simple affair: read data in, write data out. Rinse, repeat. But then as practitioners we realize matters are often more complicated. For one, I/O is slow, so we probably want operations to overlap (i.e. be processed asynchronously), especially if we have to perform many I/O operations. In this post, we’ll talk about another topic that any functional programmer will stumble upon at some point along their infinite path to enlightenment: streaming resources. Did you ever wonder what they are about? This post is an attempt at explaining why you’d want to think about this topic.
Let’s say we want to write a small utility that truncates any input to its topmost line. We can start with a pure function from input to output:
headLine :: String -> String headLine = unlines . take 1 . lines
Simple enough. We could hook this function up to an input source, possibly located somewhere on disk, and also to some output sink. This program would use resources like memory, CPU time, file descriptors and disk space.
If the amount of memory does not grow beyond a finite bound, for all possible inputs, we say that the program runs in bounded memory. More generally, we will say that a program is streaming if the usage of some resources considered scarce is bounded.
In our example we care about memory and file handles. It’s important to tame RAM usage, because the amount of fast volatile memory available in a computer is typically far smaller than the size of the program’s input. Likewise, file descriptors aren’t a commodity in infinite supply: operating systems impose aggressive per-process limits by default. Disk space, and sometimes even CPU time, are comparatively far less scarce, so we won’t worry about those here.
It can be hard to reconcile the constraints of resource scarcity with another imperative: don’t give up on writing programs from composable pieces that can be well understood in isolation from each other, lest you end up with unmaintainable spaghetti. This is where streaming libraries can help: the idea is to define once and for all common patterns that enable building streaming and compositional programs.
Resuming our running example, we could make a streaming program from
headLine provided that we satisfy the following conditions:
- evaluation of the output string should not be forced into memory all
at once by any callers of
- the source of the input string needs to be closed soon enough to prevent open handles from accumulating.
Additionally, for the program to be a correct program,
- the source of the input string should not be closed before the output string has been fully evaluated.
These conditions embody the amount of thinking that the programmer
needs to do without help from the compiler. They present opportunities
for the programmer or the language’s runtime system to screw up, so
that we end up with a program that is either not streaming or is
incorrect. In Haskell, traditionally, people have been exploiting lazy
evaluation to build streaming programs: if we can somehow produce
a string that represents the entire contents of a file, we could plug
that string as an input into
headLine and hope that only the first
line will ever be evaluated and loaded in memory. But this is
a dangerous assumption. The type system no longer distinguishes
String is a list of values, a computation which will
produce the values on demand, or a computation which requires a file
handle to complete successfully.
Consider this attempt at a full program that uses
import Control.Exception (bracket) import System.IO (hGetContents, hClose, openFile, IOMode(ReadMode)) printHeadLine1 :: FilePath -> IO () printHeadLine1 path = do contents <- bracket (openFile path ReadMode) hClose hGetContents putStr $ headLine contents
The type checker is happy to let it go through. However, it always
produces an empty output. This is because what
(something of type
String) is really a computation that performs
I/O as a side effect, not a regular value, despite what the type
suggests. As soon as we evaluate
contents, or any part of it, those side
effects will have to occur. But in the example above, due to laziness,
any evaluation of
contents will happen as part of the evaluation of
headLine, and by the time that happens, the file handle is already
closed, thus violating our third condition above. Here’s a fix:
import Control.DeepSeq (($!!)) import Control.Exception (bracket) import System.IO (hGetContents, hClose, openFile, IOMode(ReadMode)) printHeadLine2 :: FilePath -> IO () printHeadLine2 path = do str <- bracket (openFile path ReadMode) hClose \h -> do contents <- hGetContents h return $!! contents putStr $ headLine str
Now, evaluation of the
contents side-effecting computation is forced
to happen before the file handle is closed by
($!!). The result
str is a string available at the time it is consumed. Problem
solved? Not quite, because this time the whole file contents is loaded
into memory at once. What we really want is for the input of
headLine to be a computation that produces the values on demand.
A streaming version follows.
import Control.Exception (bracket) import System.IO (hGetContents, hClose, openFile, IOMode(ReadMode)) printHeadLine :: FilePath -> IO () printHeadLine path = do bracket (openFile path ReadMode) hClose $ \h -> hGetContents h >>= putStr . headLine
So it turns out that we can write a correct and streaming program. But it would be great if the type checker could help us verify the three conditions above.
Streaming libraries are a great help to write correct streaming programs. There are many out there, but we’ll focus here on the streaming package. The argument would work as well with other streaming libraries like conduit, pipes, enumerator or io-streams.
streaming package, like other streaming libraries, helps to
discern whether a value is a list or a computation. It offers an
Stream abstraction as a sequence of computations on some
m. Each computation can produce a part of
a potentially long list of values.
streaming package has a companion package
which provides an effectful
ByteString abstraction. Similar to
ByteString is a sequence of computations, each of
which yields a part of a potentially long bytestring.
To be concrete, let us consider the function
with these abstractions.
import qualified Data.ByteString.Streaming (ByteString) import qualified Data.ByteString.Streaming.Char8 as SB import qualified Streaming headLineStream :: Monad m => ByteString m r -> ByteString m () headLineStream = SB.unlines . Streaming.takes 1 . SB.lines
This function transforms an effectful bytestring. It might not reside
fully in memory, but it may be produced in chunks as the bytestring is
consumed. In contrast to lazy
ByteStrings, the effectful bytestring
produces the chunks in the monad
m rather than through lazy
evaluation. Thus the type makes explicit that some computation happens
as the bytestring is consumed, and it becomes possible to reason about
the order in which resources are acquired, used, and released in terms
of the monad operations.
SB.lines :: Monad m => ByteString m r -> Stream (ByteString m) m r Streaming.takes :: Monad m => Int -> Stream (ByteString m) m r -> Stream (ByteString m) m () SB.unlines :: Monad m => Stream (ByteString m) m r -> ByteString m r
The function starts by creating a stream of lines. Each line is itself an effectful bytestring. When the first bytestring is fully consumed, the bytestring for the second line becomes available.
The conditions to ensure that the resulting program is streaming and correct are as follows:
- The output
ByteStringis not fed to any function that loads all of the output into memory like SB.toStrict,
- the source of the input
ByteStringneeds to be closed soon enough to prevent open handles from accumulating, and
- the output
ByteStringshall not be used after the source of the input
These might look similar to the conditions we had to satisfy previously, but now these conditions do not refer to some evaluation that may happen lazily. Programmers are no longer responsible for distinguishing values from effectful computations, the compiler will do it for them. Thus, by using a streaming library, we are reducing the amount of unaided bookkeeping that the programmer needs to conduct.
Let us consider the full program for the sake of completeness:
printHeadLineStream :: FilePath -> IO () printHeadLineStream fp = runResourceT $ SB.stdout $ headLineStream $ SB.readFile fp
which produces an effectful
stream with the contents of the file. The file is created using the
class to ensure that the file is closed before
SB.stdout :: MonadIO m => ByteString m r -> m r
The call to
will consume the effectful
ByteString returned by
by printing it to the standard output.
Streaming libraries support writing composable streaming programs without relying on lazy I/O. This simplifies reasoning about the order in which resources are acquired, used, and released. However, no streaming library today guarantees that well-typed programs are always streaming. The programmer is still responsible for getting resource management right (but there are other libraries to help with that too, like resourcet).
In the next blog post in this series, we will delve in more detail into the features that streaming libraries provide and how they allow writing composable programs while keeping lazy I/O out of the equation.