20 June 2016 ||
Maintaining a compute cluster for batch or stream data processing is hard work. Connecting it up to storage facilities and time sharing resources across multiple demands even more so. Fortunately cloud service providers these days typically upscale their offering to not just providing infrastructure as a service (IaaS), but also fully baked platforms (PaaS) on top for specific verticals or workflows. Amazon offers Elastic MapReduce for creating on-demand Hadoop clusters with just a few clicks. Google, Microsoft and others provide similar services for creating cookie-cutter clusters on-demand. These services make the common case easy: running a Java/Python/R distributed batch job, written using Hadoop API's, sourcing data from S3. In this post we'll show you how to turn these robust existing PaaS offerings into interactive distributed Java/Python/R/Haskell platforms.
To illustrate, we ran a topic analysis Haskell/Scala app applied to all of English Wikipedia on 16 nodes (64 cores and 480GB RAM in total) simultaneously.
We'll use topic modeling with Latent Dirichlet Allocation (LDA)
from our last post as a running example. The full
code for this distributed application is available
here. It's a short 35 line Haskell program, with
a bona fide
main entry point, leveraging the Spark API
for transparently distributing data parallel operations over as many
nodes in the cluster as are currently available. Much of the heavy
lifting is performed by canned natural language processing code, also
part of Spark, in their "machine learning" library.
In this post, like in the original Scala version, we'll perform topic modeling on all of the English language Wikipedia. The nice thing about building a distributed app using the Spark API is that it can run anywhere. You can try performing topic modeling over such a large dataset on your laptop, like we did in the last post using a much smaller dataset, but you'll likely run out of patience long before it finishes. So instead, let's run this in the Cloud, where we can rent one-off clusters of any size on-demand for as long as we need one.
There are many Cloud services available to create such on-demand clusters easily, but the Databricks managed Spark service is particularly convenient to use. First, let's build and package our Haskell app as a JAR archive that can be uploaded to Databricks:
$ git clone https://github.com/tweag/sparkle.git $ cd sparkle $ stack --nix build $ stack --nix exec -- sparkle package sparkle-example-lda
As always, the
--nix flag to all Stack commands means
the build will use a locally provisioned, auto-downloaded JVM and
Spark class files. This sequence of commands produces a file called
sparkle-example-lda.jar, a self-contained archive containing our
Haskell application as well as all of the Haskell dependencies and
system shared libraries, plus some Java glue code to allow Haskell
and the JVM to interoperate nicely. You can think of a
a universal file format for running our application anywhere.
In the next section, we'll run the app on a cluster using the Databricks paid service, but read on for how to do that on plain old Amazon EMR too.
Think of Databricks as an IDE for analytics. You get,
Databricks currently only supports Scala, Python and R in notebooks. But for full-fledged batch or streaming "jobs" (i.e. standalone apps), Haskell is supported too, via the sparkle bindings. Let's walk through the process of running a distributed Haskell app in Databricks.
We'll want to,
.jarwe created in the previous section,
When you get to the upload step, just make sure to mention
io.tweag.sparkle.SparkMain as the "main" class for your app (Amazon
EMR recognizes the main class automatically but Databricks doesn't
When a job run starts, Databricks creates a new cluster specially for that job, which it tears down once the run is over. For the Wikipedia dataset, which we can represent (together with an integer index identifying each article) as a Haskell value using,
do docs <- textFile sc "dbfs:/databricks-datasets/wiki/part-*" return (zipWithIndex docs)
we created a large 16 node cluster to process all 4M+ pages that make up the English language Wikipedia in 27 minutes. And the results are in!
Alternatively, you can run the same
.jar on Amazon Elastic MapReduce
(EMR). Getting started with Amazon's EMR just requires an AWS account.
With those credentials in hand, the process is a matter of:
.jarto the cluster as a new "step" (click Add step in the task list and paste in the S3 location where you uploaded the JAR).
What we've shown in this post is that by packaging Haskell
applications as standalone
.jar files binding to industry standard
JVM based API's for distributed computing, we can now leverage
a wealth of existing tools, environments and cloud services to
transparently run Haskell on a very large scale. The key enabler here
is that standalone
.jar's is the basic unit of distribution that
enterprise and data analytics tools expect, so that's the form factor
we targeted for our Haskell app.
While the 4 million pages of Wikipedia stand at just 51 GB to process, 50 iterations of LDA topic modeling represents close to 30 hours of single core processing time, which we were able to bring down to just 27 minutes by provisioning a bunch of worker nodes at the click of a button. We didn't have to build any of the infrastructure to support that feat ourselves: we got to reuse it for free.
In a future post, we'll explore some of the technical details that
made it possible to ship unmodified GHC-compiled Haskell as
.jar that looks like any other Java program to the JVM.
In the meantime, there's plenty to do to fully realize the vision of
seamless, statically typed, purely functional data analytics in the
cloud. In particular, while in this post we focused on standalone
Haskell apps, it would be great if support for Haskell extended to
interactive notebooks with mixed Java/R/Haskell as well.