Disco: a Map reduce framework in Python and Erlang

December 21st, 2008

Disco is a Python-friendly, open-source Map-Reduce framework for distributed computing with the slogan “massive data – minimal code”. Disco’s core is written in Erlang, a functional language designed for concurrent programming, and users typically write Disco map and reduce jobs in Python. So what’s wrong with using Hadoop? Nothing, according to the Disco site, but…

“We see that platforms for distributed computing will be of such high importance in the future that it is crucial to have a wide variety of different approaches which produces healthy competition and co-evolution between the projects. In this respect, Hadoop and Disco can be seen as complementary projects, similar to Apache, Lighttpd and Nginx.

It is a matter of taste whether Erlang and Python are more suitable for the task than Java. We feel much more productive with Python than with Java. We also feel that Erlang is a perfect match for the Disco core that needs to handle tens of thousands of tasks in parallel.

Thanks to Erlang, the Disco core remarkably compact, currently less than 2000 lines of code. It is relatively easy to understand how the core works, and start experimenting with it or adapt it to new environments. Thanks to Python, it is easy to add new features around the core which ensures that Disco can respond quickly to real-world needs.”

The Disco tutorial uses the standard word counting task to show how to set up and use Disco on both a local cluster and Amazon EC2. There is also homedisco, which lets programmers develop, debug, profile and test Disco functions on one local machine before running on a cluster. The word counting example from the tutorial is certainly nicely compact:

from disco.core import Disco, result_iterator

def fun_map(e, params):
    return [(w, 1) for w in e.split()]

def fun_reduce(iter, out, params):
    s = {}
    for w, f in iter:
        s[w] = s.get(w, 0) + int(f)
    for w, f in s.iteritems():
        out.add(w, f)

results = Disco("disco://localhost").new_job(
		name = "wordcount",
                input = ["http://discoproject.org/chekhov.txt"],
                map = fun_map,
		reduce = fun_reduce).wait()

for word, frequency in result_iterator(results):
	print word, frequency