Please check Part 1 here if you have not already.
Part 2: The Payload (Python Jobs)
Preparing the Data (files)
Disco Distributed Filesystem (DDFS) is a great low-level component of Disco. DDFS is designed with huge data in mind, so it made more sense to use it in my experiment as opposed to any other type of storage, for example, HDFS. Moreover, we can even store job results in DDFS, which we are going to do below.
The Disco documentation on (DDFS) is very informative and thorough, however, I will provide just a rough idea about the two main concepts under DDFS which are blobs and tags. Blobs are the data itself, the files or objects that are being stored into DDFS. Tags are the labels for these blobs, also tags contain metadata about the blobs, you can simply use it to organize your data (blobs) or even to add timestamps to your data.
Using the ddfs
command to split the data into chunks:
|
|
The above command creates a tag data:batch
and pushes the chunked data from the file batch.txt
into DDFS.
Now that the chunks have been created (distributed and replicated across DDFS) we can examine the blobs using this command:
|
|
Moreover, we can also view the contents of the blobs using the following command:
Note from the docs: Chunks are stored in Disco’s internal compressed format, thus we use
ddfs xcat
instead ofddfs cat
to view them.
|
|
Now we have our data stored into DDFS and ready to be utilized by Disco.
Creating the Python jobs (functions)
We know that it’s possible to run Python jobs for the map and reduce functions in Disco. So let’s experiment with a simple map and reduce functions in Python.
Starting with the map
function, it has two arguments: line
, which is a single entry from the input source (this is normally a text file, however, in our case this would be a custom input stream from the DDFS blob that we created earlier — more on this below) and params
, which is any additional input that might be required by your map
function but I don’t use it in my example.
The map
function below takes an entry from the source (input), it splits the line into a list of sub-items separated by the \n
newline character. Finally, it returns an iterator of key-value pairs of each sub-item, where the key is the sub-item itself and the value is always 1
denoting the count of this sub-item or the number of occurrences in our dataset.
|
|
Next, let’s have a look on the reduce
function. This function also has two arguments, the iter
which is an iterator over the key-value produced by the map
function that was assigned to this reduce
instance and params
which is also used for additional input for the reduce
function.
In this function, I am simply using the disco.util.kvgroup
function to retrieve each entry in addition to the sum of its count. And this is how you count the occurrences of specific items in parallel using map and reduce functions under Disco.
|
|
Writing our Python job
For our simple job, we will be happy to just print out the results. We will make use of the disco.core.Job
object and the disco.core.result_iterator
function.
Using the wait()
method of the Job
object which returns the results from the job once it’s finished running. The wait()
method return value is then passed to the result_iterator
function which in turn iterates over all of the key-value pairs in the results from our job.
We run the job using the run()
function which takes the input
stream as an argument pointing to the DDFS blob identified by its tag
(label): ["tag://data:batch"]
in addition to map
and reduce
functions and in our case we added another argument the map_reader
which defines a custom function for parsing the input stream (DDFS in our case) using the disco.worker.task_io.chain_reader()
function.
Finally, after our job is finished and digested by the result_iterator()
function it gets printed out.
|
|
Below is the complete code snippet that I used in my experiment with Disco
|
|
Monitoring the progress of the job
Part 1 of this experiment showed how to setup the Web UI which can be accessed afterwards on http://localhost:8989
The Web UI allows us to monitor the progress of the Python job in real-time as well as monitor the resources used by our job.
You will be able to see each job by its ID, the execution time for each phase (map, map_shuffle, reduce), the total time for the job and also the DDFS addresses of the inputs and the output of the job as shown in the screenshot below (click on it for a larger view).
Bonus: Writing results into a DDFS blob
Depending on our data size and use case, we might need to save the results into a DDFS blob that is also distributed across the cluster. This can be easily done by adding the save_results=True
argument to the run()
function, for example:
|
|
Moreover, one could also dramatically enhance the map and reduce functions by omitting the sorting phase using sort=None
argument in the run()
function, for example:
|
|
Export results from DDFS to other formats (plain text)
Now we can get a backup of the data if required directly from DDFS using the ddfs
command using the job ID from the Web UI Job@61c:5bf95:406fd
and the DDFS tag of disco:results
like so:
|
|
The above command dumps the results into a text file results_output.txt
. That’s all for now.
I hope you enjoyed reading through part 1 and part 2 of my experiment with the Disco project.
References
As always this little box below is your own space to express your ideas and ask me anything, feel free to use, you just need your Github account 😉