Commit 0b7fd840 authored by Nils Erik Krumnack's avatar Nils Erik Krumnack
Browse files

Merge branch 'cherry-pick-8ffd72f8-21.2' into '21.2'

Sweeping !48132 from master to 21.2.
allow running multiple parallel jobs in LocalDriver

See merge request atlas/athena!48175
parents c00ffbdf cb51afc5
......@@ -390,6 +390,16 @@ namespace EL
public:
static const std::string optLocalNoUnsetup;
/// \brief the option to specify the number of parallel jobs in
/// \ref LocalDriver (0 = number of hardware cores) (default=1)
///
/// As an intermediate between running a single job locally and
/// running in batch, this allows to run multiple processes in
/// parallel locally. It is still recommended to just run in
/// batch instead, but sometimes this is more practical.
public:
static const std::string optNumParallelProcs;
/// \brief the option to do processing in a background process in PROOF
public:
......
......@@ -60,6 +60,7 @@ namespace EL
const std::string Job::optDisableMetrics = "nc_disable_metrics";
const std::string Job::optResetShell = "nc_reset_shell";
const std::string Job::optLocalNoUnsetup = "nc_local_no_unsetup";
const std::string Job::optNumParallelProcs = "nc_num_parallel_processes";
const std::string Job::optBackgroundProcess = "nc_background_process";
const std::string Job::optOutputSampleName = "nc_outputSampleName";
const std::string Job::optGridDestSE = "nc_destSE";
......
......@@ -20,6 +20,8 @@
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/ShellExec.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <mutex>
#include <thread>
//
// method implementations
......@@ -68,6 +70,13 @@ namespace EL
data.options.castString(Job::optDockerImage)};
const std::string dockerOptions {
data.options.castString(Job::optDockerOptions)};
int numParallelProcs
= data.options.castDouble (Job::optNumParallelProcs, 1);
if (numParallelProcs < 0)
{
ANA_MSG_ERROR ("invalid number of parallel processes: " << numParallelProcs);
return StatusCode::FAILURE;
}
std::ostringstream basedirName;
basedirName << data.submitDir << "/tmp";
......@@ -76,19 +85,71 @@ namespace EL
if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
RCU_THROW_MSG ("failed to create directory " + basedirName.str());
}
for (std::size_t index : data.batchJobIndices)
auto submitSingle = [&, this] (std::size_t index) noexcept -> StatusCode
{
try
{
std::ostringstream dirName;
dirName << basedirName.str() << "/" << index;
if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
{
ANA_MSG_ERROR ("failed to create directory " + dirName.str());
return StatusCode::FAILURE;
}
std::ostringstream cmd;
cmd << "cd " << dirName.str() << " && ";
if (!dockerImage.empty())
cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
RCU::Shell::exec (cmd.str());
} catch (std::exception& e)
{
ANA_MSG_ERROR ("exception in job " << index << ": " << e.what());
return StatusCode::FAILURE;
}
return StatusCode::SUCCESS;
};
if (numParallelProcs == 1)
{
for (std::size_t index : data.batchJobIndices)
{
if (submitSingle (index).isFailure())
return StatusCode::FAILURE;
}
} else
{
std::ostringstream dirName;
dirName << basedirName.str() << "/" << index;
if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
RCU_THROW_MSG ("failed to create directory " + dirName.str());
std::ostringstream cmd;
cmd << "cd " << dirName.str() << " && ";
if (!dockerImage.empty())
cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
RCU::Shell::exec (cmd.str());
if (numParallelProcs == 0)
numParallelProcs = std::thread::hardware_concurrency();
if (numParallelProcs > int (data.batchJobIndices.size()))
numParallelProcs = data.batchJobIndices.size();
std::vector<std::thread> threads;
std::mutex mutex;
auto indexIter = data.batchJobIndices.begin();
bool abort = false;
while (threads.size() < unsigned (numParallelProcs))
{
threads.emplace_back ([&,this] () noexcept
{
std::unique_lock<std::mutex> lock (mutex);
while (indexIter != data.batchJobIndices.end() && !abort)
{
auto myindex = *indexIter;
++ indexIter;
lock.unlock();
if (submitSingle (myindex).isFailure())
{
abort = true;
return;
}
lock.lock ();
}
});
}
for (auto& thread : threads)
thread.join();
if (abort)
return StatusCode::FAILURE;
}
data.submitted = true;
}
......
/*
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
*/
//
// includes
//
#include <EventLoop/Global.h>
#include <EventLoop/LocalDriver.h>
#include <EventLoop/Job.h>
#include <EventLoopTest/UnitTest.h>
//
// main program
//
using namespace EL;
int main ()
{
LocalDriver driver;
driver.options()->setDouble (Job::optNumParallelProcs, 0u);
UnitTest ut ("local");
ut.cleanup = false;
return ut.run (driver);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment