Hefnawi

Disco: Erlang/Python MapReduce #2

Disco Project

Please check Part 1 here if you have not already.

Part 2: The Payload (Python Jobs)

Preparing the Data (files)

Disco Distributed Filesystem (DDFS) is a great low-level component of Disco. DDFS is designed with huge data in mind, so it made more sense to use it in my experiment as opposed to any other type of storage, for example, HDFS. Moreover, we can even store job results in DDFS, which we are going to do below.

The Disco documentation on (DDFS) is very informative and thorough, however, I will provide just a rough idea about the two main concepts under DDFS which are blobs and tags. Blobs are the data itself, the files or objects that are being stored into DDFS. Tags are the labels for these blobs, also tags contain metadata about the blobs, you can simply use it to organize your data (blobs) or even to add timestamps to your data.

Using the ddfs command to split the data into chunks:

ddfs chunk data:batch ./batch.txt

The above command creates a tag data:batch and pushes the chunked data from the file batch.txt into DDFS.

Now that the chunks have been created (distributed and replicated across DDFS) we can examine the blobs using this command:

# shows info about blobs of data
ddfs blobs data:batch

Moreover, we can also view the contents of the blobs using the following command:

Note from the docs: Chunks are stored in Disco’s internal compressed format, thus we use ddfs xcat instead of ddfs cat to view them.

# similar to cat command but for DDFS's compressed raw data
ddfs xcat data:batch | less

Now we have our data stored into DDFS and ready to be utilized by Disco.

Creating the Python jobs (functions)

We know that it’s possible to run Python jobs for the map and reduce functions in Disco. So let’s experiment with a simple map and reduce functions in Python.

Starting with the map function, it has two arguments: line, which is a single entry from the input source (this is normally a text file, however, in our case this would be a custom input stream from the DDFS blob that we created earlier — more on this below) and params, which is any additional input that might be required by your map function but I don’t use it in my example.

The map function below takes an entry from the source (input), it splits the line into a list of sub-items separated by the \n newline character. Finally, it returns an iterator of key-value pairs of each sub-item, where the key is the sub-item itself and the value is always 1 denoting the count of this sub-item or the number of occurrences in our dataset.

def map(line, params):
    for word in line.split("\n"):
        yield word, 1

Next, let’s have a look on the reduce function. This function also has two arguments, the iter which is an iterator over the key-value produced by the map function that was assigned to this reduce instance and params which is also used for additional input for the reduce function.

In this function, I am simply using the disco.util.kvgroup function to retrieve each entry in addition to the sum of its count. And this is how you count the occurrences of specific items in parallel using map and reduce functions under Disco.

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

Writing our Python job

For our simple job, we will be happy to just print out the results. We will make use of the disco.core.Job object and the disco.core.result_iterator function.

Using the wait() method of the Job object which returns the results from the job once it’s finished running. The wait() method return value is then passed to the result_iterator function which in turn iterates over all of the key-value pairs in the results from our job.

We run the job using the run() function which takes the input stream as an argument pointing to the DDFS blob identified by its tag (label): ["tag://data:batch"] in addition to map and reduce functions and in our case we added another argument the map_reader which defines a custom function for parsing the input stream (DDFS in our case) using the disco.worker.task_io.chain_reader() function.

Finally, after our job is finished and digested by the result_iterator() function it gets printed out.

if __name__ == '__main__':
    job = Job().run(input=["tag://data:batch"],
                    map_reader=chain_reader,
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)

Below is the complete code snippet that I used in my experiment with Disco

from disco.worker.task_io import chain_reader
from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split("\n"):
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    job = Job().run(input=["tag://data:batch"],
                    map_reader=chain_reader,
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)

Monitoring the progress of the job

Part 1 of this experiment showed how to setup the Web UI which can be accessed afterwards on http://localhost:8989

The Web UI allows us to monitor the progress of the Python job in real-time as well as monitor the resources used by our job.

You will be able to see each job by its ID, the execution time for each phase (map, map_shuffle, reduce), the total time for the job and also the DDFS addresses of the inputs and the output of the job as shown in the screenshot below (click on it for a larger view).

Disco job monitor

Bonus: Writing results into a DDFS blob

Depending on our data size and use case, we might need to save the results into a DDFS blob that is also distributed across the cluster. This can be easily done by adding the save_results=True argument to the run() function, for example:

job = Job().run(input=["tag://data:batch"],
                    map_reader=chain_reader,
                    map=map,
                    reduce=reduce,
                    save_results=True)

Moreover, one could also dramatically enhance the map and reduce functions by omitting the sorting phase using sort=None argument in the run() function, for example:

job = Job().run(input=["tag://data:batch"],
                    map_reader=chain_reader,
                    map=map,
                    reduce=reduce,
                    save_results=True,
                    sort=None)

Export results from DDFS to other formats (plain text)

Now we can get a backup of the data if required directly from DDFS using the ddfs command using the job ID from the Web UI Job@61c:5bf95:406fd and the DDFS tag of disco:results like so:

ddfs xcat disco:results:Job@61c:5bf95:406fd > results_output.txt

The above command dumps the results into a text file results_output.txt. That’s all for now.

I hope you enjoyed reading through part 1 and part 2 of my experiment with the Disco project.

References

As always this little box below is your own space to express your ideas and ask me anything, feel free to use, you just need your Github account 😉