25 April 2018 |
|Funflow is a system for building and running workflows. It's a system we've been working on for the last few months with a small number of clients. In this blog post we'll talk about what workflows are, why we built Funflow, and what we'd like to do with it in the future.
At its core, a workflow takes some inputs and produce some outputs, possibly producing some side effects along the way. Of course, this basically describes any program. Workflow systems distinguish themselves in a few ways:
Workflow systems have perhaps seen the most use in the business process space, where they allow non-programmers to automate business processes (such as authorising a purchase order) which may involve multiple steps, some of which are automatic, and others which might involve human intervention.
Another area heavily involving workflows, and one in which we are particularly interested, is in scientific data processing. For example, in bioinformatics one might develop a workflow (also often called "pipeline" in this space) to perform some in-silico analysis of sequenced gene data. Such a workflow will be customised for a particular task, and may then be run for each sequenced sample.
There are innumerable workflow systems already out there, with a myriad of different features. Tools like Apache Taverna provide a very mature solution for building enterprise workflows, with integrations for working with systems such as Hadoop and powerful GUI editors for composing workflows. At the other end of the scale, Luigi and Airflow are libraries for composing workflows in Python code.
So why build a new one? It came down to three things:
Most of the simple workflow systems we looked at were designed to be run from a
command line, by a person or at best by a tool like cron
or at
. We find,
however, that we often want to run a workflow as part of a larger application.
For example, when doing data analysis one might wish to run classifiers on
different subsets of data, and then visualise the results. We still want to be
able to use classifiers written in any language or running on a different
machine, but we should be able to track the progress in a surrounding program,
and easily get the results back. When we tried to do this with Luigi, we found
ourselves needing to parse the log output just to find out the ID of the job
we'd triggered! So integration, for us, means:
Funflow's workflows are just Haskell programs, built using arrow syntax. With Funflow, we can easily intermix steps done inside our Haskell process and steps done outside, for example by another program. Arrows give us,
a -> b
is an arrow, as is a monadic function Monad m => a -> m b
, or a stream transformer Stream a -> Stream b
.do
notation, arrows
must have all their arguments passed in explicitly - they cannot
close over arbitrary variables in their environment. This means that
we cannot accidentally pull in unneeded dependencies to our
results.There are two hard problems in programming: naming things, cache invalidation, and off by one errors.
Consider the following two workflows, expressed as Haskell functions:
flow1 :: Foo -> Bar
flow1 = extractBar . someExpensiveComputation . someTransformation
flow2 :: Foo -> Baz
flow2 = extractBaz . someExpensiveComputation . someTransformation
What we notice about these is that the first part of the computation is shared
between flow1
and flow2
. If we feed the same input Foo
, we would
particularly like not to repeat someExpensiveComputation
.
This is not an idle example; we often see workflows where part of the workflow involves preprocessing of a reference data set, which may be done multiple times, either by different users or when running a pipeline multiple times. Perhaps more importantly, it may often be desirable to tweak the parameters of some late stage of processing and rerun the pipeline - again, without rerunning the unchanged earlier parts.
In order to address this issue, Funflow borrows a couple of ideas from the Nix package manager. The first of these is to remove the notion that the user has any control over where and how the outputs of the intermediate steps in workflows are stored. Instead of the user controlling where files are output, Funflow manages a section of the file system known as the store. Entries inside the store are addressed by a unique hash (the second idea borrowed from Nix), determined by hashing both the inputs to a step and the definition of that step itself. When Funflow executes a step in a workflow, it first determines the hash of the inputs and the step definition to determine the output path. If this path already exists (since store items are immutable once written), we can skip the computation and use the result from the cache.
Crucially, Funflow goes further by trying to cache more aggressively than Nix. Whilst the hash of the inputs and step definition determines the path to which the step writes its output, upon completion of the step these outputs are moved into another path determined by their own hash: in other words, the store also works as content addressable storage. What's the benefit of this? Well, firstly, it ensures that when multiple steps produce the same output, that output is cached only once on disk. However, it also solves the problem suggested by the following flows:
flow1 :: Int -> Bar
flow1 = extractBar . someExpensiveComputation . (* 2)
flow2 :: String -> Bar
flow2 = extractBar . someExpensiveComputation . length
In this example, the tails of the flows are similar. If I provide 4
to the
first flow and "workflow"
to the second, however, then these computations will
converge after their first steps, and before someExpensiveComputation
. If a
Nix derivation were used in this case, two outputs would be produced and
someExpensiveComputation
would run twice, because ultimately the inputs
differ. Funflow, on the other hand, allows computations to converge.
Doing sensible caching might seem like a poor reason to build our own workflow system. In reality, though, it's a large part of what makes one up. Going back to our concepts above on what characterises a workflow as different from any other program, Funflow's system for managing inputs and outputs addresses them pretty directly:
What's more, the same approach we take in doing caching correctly provides solutions to a number of other important problems that arise in workflow management. For example, one important component of scientific research is for other teams to be able to reproduce the results underlying a paper. Funflow makes it possible to capture the closure for a given output, which can then be distributed easily to other locations. By using Docker containers for the processing of individual steps, and by strictly controlling their inputs, we can also ensure that computation is isolated from other parts of the system, reducing the "noise" which can sometimes change results.
As mentioned in the introduction, workflows can often be expected to fail. When they do, we'd like to control exactly what the results of that are. For example, sometimes those failures may be expected and we want to cause the workflow to take a different path when they occur. On other occasions, we want to completely abort.
Funflow supports exception handling within the workflow, so we can handle these situations correctly. It goes somewhat further: Funflow supports an effects system to allow you to define your own universe of actions which can happen during flows, and even allows you to swap out the interpreter entirely if needed.
We've written in the past about how we use Nix at Tweag, and mentioned above how various features of Funflow were inspired by it. Likewise, Tweag has worked on adapting Bazel to build polyglot Haskell projects. So one might be tempted to ask - why not just use one of these?
One certainly could build such a workflow using Nix or Bazel. But Nix doesn't support the same notion of content addressable storage as Funflow does (although it might be getting it soon). Both systems are untyped, meaning a week-long workflow could fail halfway through due to mismatched outputs to inputs, a type of bug that could have been caught by a type checker. Furthermore, in neither systems do we have precise control of the flow graph evaluator. We can't easily program custom error recovery strategies, for example.
Beyond this, there's a wider point about the difference in composition between Funflow (and most workflow systems) and build systems. In build systems one builds a tower of targets, each one explicitly depending on the targets below it. In a workflow system, one builds a series of steps with inputs and outputs, and then wire them together to create a pipeline. Each step has no explicit knowledge of the steps before or after it. In software development folklore, this is called dependency inversion.
These approaches are dual to each other, and it's certainly possible to translate between them. But taking this approach makes it easier to do things like swap steps in the middle of a pipeline in and out - something we often wish to do in a workflow.
To conclude this post, here's an example of using Funflow to compile and run a small C program. This demonstrates a number of Funflow's features:
The example is shortened in favour of readability. The full code is available in the Funflow repository.
-- | This flow takes a number, builds a C program to process that number,
-- and returns the program's output.
mainFlow :: SimpleFlow Int String
mainFlow = proc n -> do
moduleDouble <- compileModule -< "int times2(int n) { return 2*n; }\n"
moduleSquare <- compileModule -< "int square(int n) { return n*n; }\n"
moduleMain <- compileModule -<
"#include <stdio.h>\n\
\#include <stdlib.h>\n\
\int times2(int n);\
\int square(int n);\
\int main(int argc, char **argv) {\
\ int n = atoi(argv[1]);\
\ int r = times2(n) + square(n);\
\ printf(\"%d\\n\", r);\
\}"
exec <- compileExec -< [moduleDouble, moduleSquare, moduleMain]
out <- runExec -< (exec, [show n])
readString_ -< out
-- | This flow takes a string which is assumed to be the source code
-- for a 'C' module. It writes this to a file, then uses an external
-- step to compile the module.
compileModule :: SimpleFlow String (Content File)
compileModule = proc csrc -> do
cInput <- writeString -< (csrc, [relfile|out.c|])
scriptInput <- writeExecutableString -< (compileScript, [relfile|compile.sh|])
compiled <- compileDocker -< (cInput, scriptInput)
returnA -< compiled :</> [relfile|out.o|]
where
compileScript =
"#!/usr/bin/env bash\n\
\gcc -c -o $2 $1\n"
compileDocker = docker $ \(cInput, scriptInput) -> Docker.Config
{ Docker.image = "gcc"
, Docker.optImageID = Just "7.3.0"
, Docker.input = Docker.MultiInput
$ Map.fromList [ ("script", IPItem $ CS.contentItem scriptInput)
, ("data", IPItem $ CS.contentItem cInput)
]
, Docker.command = "/input/script/compile.sh"
, Docker.args = ["/input/data/out.c", "/output/out.o"]
}
-- | This flow takes a list of files which are assumed to be 'C' modules.
-- It uses an external step to compile those modules into an executable.
compileExec :: SimpleFlow [Content File] (Content File)
-- | This flow takes a file which is assumed to be an executable,
-- and a list of strings that are arguments for the executable.
-- It uses an external step to run the executable with the given arguments.
-- The output is stored in the file @[email protected] in the returned item.
runExec :: SimpleFlow (Content File, [String]) CS.Item
We think Funflow is a useful addition to the space of workflow management tools, and we're looking forward to applying it to more use cases. We have so far used it for build and deployment, data visualisation and for running a bioinformatics pipeline. We're increasingly focusing on making it easier to use, including working towards graphical composition of workflows so one can simply drag and drop them into place.
If you have a problem you'd like to apply Funflow to, then please check out the repo or get in touch.