Commit 0870b6ee authored by Nils Krumnack's avatar Nils Krumnack
Browse files

allow running multiple parallel jobs in LocalDriver

As an intermediate between running a single job locally and running in
batch, this allows to run multiple processes in parallel locally.  It
is still recommended to just run in batch instead, but sometimes this
is more practical.
parent cf063791
......@@ -390,6 +390,16 @@ namespace EL
public:
static const std::string optLocalNoUnsetup;
/// \brief the option to specify the number of parallel jobs in
/// \ref LocalDriver (0 = number of hardware cores) (default=1)
///
/// As an intermediate between running a single job locally and
/// running in batch, this allows to run multiple processes in
/// parallel locally. It is still recommended to just run in
/// batch instead, but sometimes this is more practical.
public:
static const std::string optNumParallelProcs;
/// \brief the option to do processing in a background process in PROOF
public:
......
......@@ -61,6 +61,7 @@ namespace EL
const std::string Job::optDisableMetrics = "nc_disable_metrics";
const std::string Job::optResetShell = "nc_reset_shell";
const std::string Job::optLocalNoUnsetup = "nc_local_no_unsetup";
const std::string Job::optNumParallelProcs = "nc_num_parallel_processes";
const std::string Job::optBackgroundProcess = "nc_background_process";
const std::string Job::optOutputSampleName = "nc_outputSampleName";
const std::string Job::optGridDestSE = "nc_destSE";
......
......@@ -20,6 +20,8 @@
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/ShellExec.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <mutex>
#include <thread>
//
// method implementations
......@@ -68,6 +70,13 @@ namespace EL
data.options.castString(Job::optDockerImage)};
const std::string dockerOptions {
data.options.castString(Job::optDockerOptions)};
int numParallelProcs
= data.options.castDouble (Job::optNumParallelProcs, 1);
if (numParallelProcs < 0)
{
ANA_MSG_ERROR ("invalid number of parallel processes: " << numParallelProcs);
return StatusCode::FAILURE;
}
std::ostringstream basedirName;
basedirName << data.submitDir << "/tmp";
......@@ -76,19 +85,71 @@ namespace EL
if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
RCU_THROW_MSG ("failed to create directory " + basedirName.str());
}
for (std::size_t index : data.batchJobIndices)
auto submitSingle = [&, this] (std::size_t index) noexcept -> StatusCode
{
try
{
std::ostringstream dirName;
dirName << basedirName.str() << "/" << index;
if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
{
ANA_MSG_ERROR ("failed to create directory " + dirName.str());
return StatusCode::FAILURE;
}
std::ostringstream cmd;
cmd << "cd " << dirName.str() << " && ";
if (!dockerImage.empty())
cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
RCU::Shell::exec (cmd.str());
} catch (std::exception& e)
{
ANA_MSG_ERROR ("exception in job " << index << ": " << e.what());
return StatusCode::FAILURE;
}
return StatusCode::SUCCESS;
};
if (numParallelProcs == 1)
{
for (std::size_t index : data.batchJobIndices)
{
if (submitSingle (index).isFailure())
return StatusCode::FAILURE;
}
} else
{
std::ostringstream dirName;
dirName << basedirName.str() << "/" << index;
if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
RCU_THROW_MSG ("failed to create directory " + dirName.str());
std::ostringstream cmd;
cmd << "cd " << dirName.str() << " && ";
if (!dockerImage.empty())
cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
RCU::Shell::exec (cmd.str());
if (numParallelProcs == 0)
numParallelProcs = std::thread::hardware_concurrency();
if (numParallelProcs > int (data.batchJobIndices.size()))
numParallelProcs = data.batchJobIndices.size();
std::vector<std::thread> threads;
std::mutex mutex;
auto indexIter = data.batchJobIndices.begin();
bool abort = false;
while (threads.size() < unsigned (numParallelProcs))
{
threads.emplace_back ([&,this] () noexcept
{
std::unique_lock<std::mutex> lock (mutex);
while (indexIter != data.batchJobIndices.end() && !abort)
{
auto myindex = *indexIter;
++ indexIter;
lock.unlock();
if (submitSingle (myindex).isFailure())
{
abort = true;
return;
}
lock.lock ();
}
});
}
for (auto& thread : threads)
thread.join();
if (abort)
return StatusCode::FAILURE;
}
data.submitted = true;
}
......
/*
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
*/
//
// includes
//
#include <EventLoop/Global.h>
#include <EventLoop/LocalDriver.h>
#include <EventLoop/Job.h>
#include <EventLoopTest/UnitTest.h>
//
// main program
//
using namespace EL;
int main ()
{
LocalDriver driver;
driver.options()->setDouble (Job::optNumParallelProcs, 0u);
UnitTest ut ("local");
ut.cleanup = false;
return ut.run (driver);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment