Skip to content

Add support for distributed RDataFrame

Sebastien Wertz requested to merge swertz/bamboo:feat-distRDF into master

This adds support for DistRDF, to distribute the computations dynamically to a cluster without having to manage the jobs in Bamboo ourselves. The backend handling the distribution is either Dask (distributed) (typically with jobqueue) or Spark. Through dask-jobqueue, "regular" batch systems based on Slurm or HTCondor are supported. The long-term goal is to deprecate the batch job management (driver mode) in Bamboo entirely, and rely only on DistRDF.

To use DistRDF, call bambooRun with the additional --distrdf-be <BE> argument, where <BE> can be one of dask_local (local-machine multiprocessing, useful for testing but functionally equivalent to --threads), dask_slurm, dask_condor, spark. This works with both default (sequential) mode, where one sample is processed after the other, and with --distributed parallel, where the graphs for all samples are first built, before processing all samples in parallel. Obviously, --distributed driver does not make sense here.

To configure the distributed backend, add a corresponding section in the environment file, e.g.:

[dask_slurm]
adapt_max = 100 ; submit max. 100 jobs
partitions_per_file = 4 ; split each file into 4 chunks

For dask-jobqueue, you will in addition need to configure the cluster using a jobqueue.yml file placed into ~/.config/dask/. Have a look at the documentation of dask-jobqueue.

A small backward-incompatible change will be needed in analysis modules: the prepareTree() needs an additional backend argument, as in:

class myAnalysisModule(NanoAODHistoModule):
    def prepareTree(self, tree, sample=None, sampleCfg=None, backend=None):
        tree,noSel,be,lumiArgs = super().prepareTree(tree, sample=sample, sampleCfg=sampleCfg, 
                                         description=myNanoAODDescription, backend=backend)

This features requires ROOT >= 6.26.

In practice, this adds a DataFrame "type", DistDataframeBackend, inheriting from a BaseDataframeBackend (that includes most features of the default DataframeBackend, that also inherits from BaseDataframeBackend), to accomodate the small changes needed for DistRDF. This backend type is choosen using the backend="distributed" argument in prepareTree, which is automatically selected when the --distrdf-be argument is used. The reason this is needed is to have a DistDataframeBackend.createRDF() method, called by the run_notworker function in workflow.py, to pass the Dask client or Spark session when creating the root DistRDF, avoiding the creation of the RDF in prepareTree. The Dask or Spark session can therefore not be created or modified by the user, except through the config-file options. Perhaps there is still a need to add a user hook for extra customization though, so feedback is welcome there.

The dependencies (libraries, headers, ...) added to the interpreter and the jitted symbols are kept track of in global variables, since they need to be propagated to the workers by DistRDF. This means all dependencies and symbols are always propagated to all workers, even if some may not be needed for all samples. This is slightly sub-efficient but much easier than keeping track of which graph needs which symbols, since we still want to share symbols across graphs in non-distributed mode.

Still TODO:

  • Add documentation
  • Add tests
Edited by Sebastien Wertz

Merge request reports