Distributed Computation on HPC Clusters
As discussed in some internal meetings and showed at the Hgg kickoff meeting, we would like to offer a more Pythonic way of distribute the computation on HPC clusters that use batch queuing systems like HTCondor, SLURM, etc. This means having something like
def foo(arg):
...
args = [...]
futures = map(foo, args)
where the application of foo
to different elements of args
is performed in different nodes of the cluster, rather than the procedure that consists in:
- writing a Python script that performs some operations on a data sample;
- writing a specific job submission script (different for each batch queuing system we are using) that allocates resources and runs the previously mentioned Python script;
- submit the job submission script for each data sample we are working with.
The main Python package on the market that allows to achieve this goal is probably Dask, more specifically with the Jobqueue package that provides interfaces to many different job queuing systems. Other packages like Ray or Celery can be (more or less easily) interfaces to job queuing systems as well.
There is however a drawback in interfacing these kinds of packages with HPC job scheduling systems that I will try to summarize in the following.
Dask (and Ray, Celery, etc.) all have the same way of working: the main actors are a client, one (or more) task queues and a set of workers. The client puts the tasks to perform on the tasks queue, while the workers continuously interrogate the task queue to check if there are tasks to perform. Ideally, every worker runs on a different node of the cluster, so the workload is distributed.
This kind of workflow is ideal when the developer has admin rights on the whole cluster and no job scheduling system stands in the way. When, on the other hand, there is already a scheduling system running on the cluster (which is pretty much always the case in the HPC facilities managed by universities that we usually use) the way Dask interfaces with it consists in using the above mentioned job submission scripts to run workers on different nodes of the cluster rather than Python scripts with specific tasks. In this way it is possible to keep the interactivity of the workflow also when we scale from local parallelization (multiprocessing, multithreading) to cluster parallelization (multiple nodes). However, this comes with a not negligible drawback: since usually HPC scheduling systems are usually configured to optimize throughput (i.e. favor short, with less resources required jobs rather than long ones), the fact of starting arbitrarily long workers that run for an indefinite amount of time can lead to:
- inefficient use of resources (workers that are running but we're not using them since we are working interactively);
- faster drop of priority.
Dask seems to be able to prevent this situation using Cluster.adapt
rather than Cluster.scale
(needs deeper investigation). The same solution doesn't seem to be present for Ray and Celery (or, if it exists, I didn't find it).
Another possible way to implement the API mentioned at the beginning (map(foo, args)
) is the one used by htmap for HTCondor (https://htmap.readthedocs.io/en/latest/) (see also this issue concerning how to run htmap on Lxplus). In this case, no task queue and set of workers are set; when running something like htmap.map(foo, args)
, internally cloudpickle is used to serialize the arguments with which map
is called and internally submit jobs that instruct how to apply foo
to each arg
in args
and serialize again the result of the computation. These results are then deserialized and returned to the user as proper Python objects.
I was already able to implement a very simple prototype of this behavior using for SLURM (for which a package like htmap for HTCondor doesn't seem to exist).