Technical groupsOpen sourceCareersResearchBlogContactConsulting Platform
Global consulting partnerEuropean product partnerGlobal Atlassian transformation leader
The Exodus to Streamgard, an epic poem

1 November 2017 — by Yves Parès

If Haskell was a god, often would he be depicted with the ravens Modularity and Abstraction flying above him, hovering the world and reporting to him every detail of our whereabouts. Haskell would sit on the Throne of Purity and look upon the world with an eye full of wisdom. And in his hand, the mighty Haskell would wield the Spear of Lazy Lists, which is said to have the power to tackle each and every problem the world might have to face. And to honour him, we would code and abstract everything with lazy lists. For millennia would lists be used to map, filter, separate, merge, group, and so forth.

But, one day, the Real-World Serpent, son of the wicked Foldr, would come. And the Real-World Serpent carries an eternal hatred towards lazy lists. Oh, that dreaded Serpent, that will throw everything it can muster to prevent us from staying within the warm comfort of abstraction and laziness. The Serpent will assemble its minions, Early-close and Strictness of effects, and unleash its wrath upon our world. Foldl, son of Haskell and brother of Foldr, would lead humanity to its last bastion, Streamgard, and organize the final fight…

So, long story short, streaming is a library that allows you to leverage the insights you have gained while manipulating lazy lists in Haskell to handle effectful streams of data. We already talked about streaming on this blog, with this post discussing the IO part and this one comparing it to pipes and conduit. Here, we will be using streaming for highly efficient data processing and filtering. To this effect, we will use it conjointly with another library, foldl, which gives us an Applicative interface to the usual list functions. In this blog post we will apply them to the task of computing some statistics about a distribution of data. We want to be able to:

  • process the input data stream only once (aka in one pass),
  • never repeat the effects that were used to produce that data stream,
  • maintain the possibility to use the input stream as if it were a list, for instance by splitting it into two subparts, sending each subpart to be processed by a specific function.

So lets imagine that the statistics I want to compute on my input data distributions take the shape of a simple summary. This is what I want to obtain in the end:

data Summary v a = Summary
  { summaryLength :: Int
  , summaryMins :: [a]
  , summaryMaxes :: [a]
  , summaryMean :: v
  , summaryStdDev :: v
  }
  deriving (Show)

Nothing too fancy here, I just want to be able to compute the length, the n smallest elements, the n' biggest elements, the mean and the standard deviation of my distribution. We distinguish the types a and v here because our input distribution does not have to be numerical, as long as we have a projection a -> v available. This way, we can compute a summary of a stream of (Double, String) tuples, for instance, if the projection is just fst.

So let’s have a little reminder of our conditions. We want to be able to read the input data only once. But, we still want modularity and reusability. We do not want to have to recode our Summary-computing function every time we want to add a new field, and we would like to reuse already existing functions computing these statistics. And this is where the foldl package comes in.

This package defines a type Fold as follows:

data Fold a b = forall acc. Fold (acc -> a -> acc) acc (acc -> b)

You might recognize here the typical arguments of the classical foldl function of the Prelude: a is the type of each element of the input stream we consume, the first field (acc -> a -> acc) is an accumulation function and the second field acc is the initial value of the accumulator. The new component is the b type parameter and the last field (acc -> b). This one is called extract. It is used to extract the final value out of the accumulator. This is necessary so that Fold a can be a Functor and therefore an Applicative. See the original blog post and this talk by Gabriel Gonzalez for more detail, though be aware that Fold had a different shape back then.

One of the central ideas of the foldl library is that Fold implements the Applicative type class:

instance Applicative (Fold a)

Crucially, this instance combines two Folds, into a guaranteed one-pass traversal of the data. Therefore we can safely decompose the computation of a Summary as follows:

import qualified Control.Foldl as L
import Data.Function (on)

summarizeBy :: (Floating v, Ord v)
            => (a -> v) -> Int -> Int -> L.Fold a (Summary v a)
summarizeBy f nMins nMaxes = Summary
  <$> L.length
  <*> collect ((>=) `on` f) nMins
  <*> collect ((<=) `on` f) nMaxes
  <*> L.premap f L.mean
  <*> L.premap f L.std

What’s happening here? We are using a few of the functions already present in the foldl package and a new one, so let’s delve into it a bit. The function summarizeBy takes a projection f, which we talked about earlier, the number of smallest elements we want to collect and the number of biggest elements. Then our five statistics are computed:

  • L.length :: L.Fold a Int gives us the number of elements in the input.
  • collect, which we will define a bit later, accumulates either the mins or the maxes given a comparison function.
  • L.mean gives us the average. We use L.premap f to turn it into a fold that will work on our projection f.
  • L.std gives us the standard deviation.

The combination of the above gives us a Fold a (Summary v a), something that will consume a stream of a’s and output a summary. At this point, nothing is consumed, we have only composed folds together, and a Fold is agnostic of the exact nature of the input. Running it on any Foldable datatype for instance is just a matter of calling:

L.fold (summarizeBy id 3 3) [1..100]

The only function not provided by the foldl package is the collect function. Defining it as a brand new Fold is simple:

import Data.Sequence as Seq

collect :: (a -> a -> Bool) -> Int -> L.Fold a [a]
collect skipPred n = L.Fold insertPop Seq.empty (L.fold L.list)
  where
    insertPop acc x
      | Seq.length acc < n = insert x acc
      | otherwise          = pop (insert x acc)
    insert x s = let (before, after) = Seq.spanl (skipPred x) s
                 in before <> Seq.singleton x <> after
    pop s = case viewr s of
              s' :> _ -> s'
              _ -> s

Here we manually defined a new Fold from the three elements we mentioned earlier: an accumulation function (insertPop), an initial accumulator value (Seq.empty) and an extract function ((L.fold L.list), which also uses a Fold to turn the final sequence into a plain list).

Now, the astute reader will notice we left streaming aside. Let’s get back to it. Let’s use as an input the classic Titanic dataset:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
...

We want to get two different summaries for the fares: one for the passengers that survived and one for those who did not. First, let’s load the CSV into a stream by using the streaming-cassava and streaming-bytestring packages:

{-# LANGUAGE OverloadedStrings #-}
import Control.Monad (mzero)
import qualified Data.ByteString.Streaming as BS
import Streaming
import Streaming.Cassava

data Passenger { name :: !String, fare :: !Double, survived :: !Bool }
  deriving (Show)

instance FromNamedRecord Passenger where
  parsedNamedRecord m =
    Person <$> m .: "Name" <*> m .: "Fare" <*> (toBool =<< (m .: "Survived"))
  where toBool 0 = return False
        toBool 1 = return True
        toBool _ = mzero

streamCsv :: (MonadResource m) => Stream (Of Passenger) m ()
streamCsv = decodeByName (BS.readFile ".../titanic.csv")

Nothing too fancy here, just a bit of required boilerplate to be able to read Passengers from the CSV file. MonadResource is necessary to track the files opened by our program. The type Stream (Of Passenger) m () means that we will be manipulating a stream whose elements are Passengers, that will run some effects in a monad m and return no result in the end.

Now, lets split that input in two different substreams:

import qualified Streaming.Prelude as S

aliveDead :: Stream (Of Passenger) (Stream (Of Passenger) m) ()
aliveDead = S.partition survived streamCsv

Let’s look at the type of aliveDead: it is a Stream over another Stream. Stream (Of a) is actually a monad transformer, the way the partitioning happens is by creating two layers: one for the live passengers and one for the dead ones. It’s not exactly a tuple of two streams (as it would be with Data.List.partition), but is has the same advantages: each layer can be processed by different functions which don’t have to know where the stream they process lies in the monad stack. Therefore, each one of these functions can be expressed as:

summarizePassengers
  :: (Monad m) => Stream (Of Passenger) m a -> m (Of (Summary Double Passenger) a)
summarizePassengers = L.purely S.fold (summarizeBy fare 3 3)

where m can be any monad. This can be the bottom MonadResource or another Stream, summarizePassengers does not mind and does not have to! Of behaves like a tuple, so it simply means that we return both the newly computed Summary and an a (a may just be (), but here we have to be a little more general). S.fold is the basic folding function for streams. L.purely fn f “unpacks” a Fold f and calls a folding function fn. So now, getting our summaries is just a matter of

runAll = runResourceT $ do
  (summaryAlive :> summaryDead :> ()) <-
    summarizePassengers $ summarizePassengers aliveDead
  ...

So in the end, we split the input file in two substreams, we computed various statistics twice, and despite all this streaming and foldl guarantee that the input will be read only once in bounded memory.

These techniques are currently being applied by Tweag I/O in the context of a project with Novadiscovery. Novadiscovery is a consulting company for in silico clinical trials, namely simulation of virtual patients through biomodeling. Parts of this blog post are actual code from the tools we develop with them.

About the authors
Yves Parès
If you enjoyed this article, you might be interested in joining the Tweag team.
This article is licensed under a Creative Commons Attribution 4.0 International license.

Company

AboutOpen SourceCareersContact Us

Connect with us

© 2023 Modus Create, LLC

Privacy PolicySitemap