Please check Part 1 here if you have not already.
Part 2: The Payload (Python Jobs)
Preparing the Data (files)
Disco Distributed Filesystem (DDFS) is a great low-level component of Disco. DDFS is designed with huge data in mind, so it made more sense to use it in my experiment as opposed to any other type of storage, for example, HDFS. Moreover, we can even store job results in DDFS, which we are going to do below.
The Disco documentation on (DDFS) is very informative and thorough, however, I will provide just a rough idea about the two main concepts under DDFS which are blobs and tags. Blobs are the data itself, the files or objects that are being stored into DDFS. Tags are the labels for these blobs, also tags contain metadata about the blobs, you can simply use it to organize your data (blobs) or even to add timestamps to your data.
ddfs command to split the data into chunks:
ddfs chunk data:batch ./batch.txt
The above command creates a tag
data:batch and pushes the chunked data from the file
batch.txt into DDFS.
Now that the chunks have been created (distributed and replicated across DDFS) we can examine the blobs using this command:
# shows info about blobs of data ddfs blobs data:batch
Moreover, we can also view the contents of the blobs using the following command:
Note from the docs: Chunks are stored in Disco’s internal compressed format, thus we use
ddfs xcatinstead of
ddfs catto view them.
# similar to cat command but for DDFS's compressed raw data ddfs xcat data:batch | less
Now we have our data stored into DDFS and ready to be utilized by Disco.
Creating the Python jobs (functions)
We know that it's possible to run Python jobs for the map and reduce functions in Disco. So let's experiment with a simple map and reduce functions in Python.
Starting with the
map function, it has two arguments:
line, which is a single entry from the input source (this is normally a text file, however, in our case this would be a custom input stream from the DDFS blob that we created earlier — more on this below) and
params, which is any additional input that might be required by your
map function but I don't use it in my example.
map function below takes an entry from the source (input), it splits the line into a list of sub-items separated by the
\n newline character. Finally, it returns an iterator of key-value pairs of each sub-item, where the key is the sub-item itself and the value is always
1 denoting the count of this sub-item or the number of occurrences in our dataset.
def map(line, params): for word in line.split("\n"): yield word, 1
Next, let's have a look on the
reduce function. This function also has two arguments, the
iter which is an iterator over the key-value produced by the
map function that was assigned to this
reduce instance and
params which is also used for additional input for the
In this function, I am simply using the
disco.util.kvgroup function to retrieve each entry in addition to the sum of its count. And this is how you count the occurrences of specific items in parallel using map and reduce functions under Disco.
def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts)
Writing our Python job
For our simple job, we will be happy to just print out the results. We will make use of the
disco.core.Job object and the
wait() method of the
Job object which returns the results from the job once it's finished running. The
wait() method return value is then passed to the
result_iterator function which in turn iterates over all of the key-value pairs in the results from our job.
We run the job using the
run() function which takes the
input stream as an argument pointing to the DDFS blob identified by its
["tag://data:batch"] in addition to
reduce functions and in our case we added another argument the
map_reader which defines a custom function for parsing the input stream (DDFS in our case) using the
Finally, after our job is finished and digested by the
result_iterator() function it gets printed out.
if __name__ == '__main__': job = Job().run(input=["tag://data:batch"], map_reader=chain_reader, map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count)
Below is the complete code snippet that I used in my experiment with Disco
from disco.worker.task_io import chain_reader from disco.core import Job, result_iterator def map(line, params): for word in line.split("\n"): yield word, 1 def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) if __name__ == '__main__': job = Job().run(input=["tag://data:batch"], map_reader=chain_reader, map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count)
Monitoring the progress of the job
Part 1 of this experiment showed how to setup the Web UI which can be accessed afterwards on http://localhost:8989
The Web UI allows us to monitor the progress of the Python job in real-time as well as monitor the resources used by our job.
You will be able to see each job by its ID, the execution time for each phase (map, map_shuffle, reduce), the total time for the job and also the DDFS addresses of the inputs and the output of the job as shown in the screenshot below (click on it for a larger view).
Bonus: Writing results into a DDFS blob
Depending on our data size and use case, we might need to save the results into a DDFS blob that is also distributed across the cluster. This can be easily done by adding the
save_results=True argument to the
run() function, for example:
job = Job().run(input=["tag://data:batch"], map_reader=chain_reader, map=map, reduce=reduce, save_results=True)
Moreover, one could also dramatically enhance the map and reduce functions by omitting the sorting phase using
sort=None argument in the
run() function, for example:
job = Job().run(input=["tag://data:batch"], map_reader=chain_reader, map=map, reduce=reduce, save_results=True, sort=None)
Export results from DDFS to other formats (plain text)
Now we can get a backup of the data if required directly from DDFS using the
ddfs command using the job ID from the Web UI
Job@61c:5bf95:406fd and the DDFS tag of
disco:results like so:
ddfs xcat disco:results:Job@61c:5bf95:406fd > results_output.txt
The above command dumps the results into a text file
results_output.txt. That's all for now.
I hope you enjoyed reading through part 1 and part 2 of my experiment with the Disco project.
As always this little box below is your own space to express your ideas and ask me anything, feel free to use, you just need your Github account 😉