diff --git a/Core/include/ACTFW/Concurrency/parallel_for.hpp b/Core/include/ACTFW/Concurrency/parallel_for.hpp index 32e6d590d3ce336c995c4552564a3bac84e9fe2b..75b437dc0df0575ea813aa561f8f5d9a4fd63c58 100644 --- a/Core/include/ACTFW/Concurrency/parallel_for.hpp +++ b/Core/include/ACTFW/Concurrency/parallel_for.hpp @@ -18,85 +18,177 @@ #ifndef ACTFW_CONCURRENCY_PARALLEL_FOR_H #define ACTFW_CONCURRENCY_PARALLEL_FOR_H 1 +#include <boost/optional.hpp> +#include <boost/variant/get.hpp> +#include <boost/variant/variant.hpp> #include <exception> #include "ACTFW/Framework/ProcessCode.hpp" -/// We will use this simple helper around the _Pragma operator of C99 -/// and C++11, if it is not defined elsewhere -/// -#ifndef ACTFW_PRAGMA -#define ACTFW_PRAGMA(x) _Pragma(#x) -#endif +// Let's keep as much as possible inside of namespaces +namespace FW { +namespace Details { -/// Out-of-order multithreaded equivalent of the following serial loop -/// (macro arguments capitalized for emphasis): -/// -/// for(size_t INDEX = START; index < END; ++index) { -/// ... -/// } -/// -/// One difference with a serial loop is that you cannot easily abort -/// the computation using break and return statements. This is a -/// limitation of the OpenMP 3.1 tasking backend that we currently use. -/// -/// Replacements for the common use cases of break and return are -/// provided in ACTFW_PARALLEL_FOR_BREAK and ACTFW_PARALLEL_FOR_ABORT. -/// -/// Unlike raw OpenMP 3.1, we also support exception propagation. -/// -/// One difference with a serial loop is that code which is passed as -/// an argument to this macro should contain no preprocessor directive. -/// -/// @param index must be unique within the enclosing scope. -/// -#define ACTFW_PARALLEL_FOR(index, start, end, ...) \ - { \ - bool actfw_parallel_break_##index = false; \ - bool actfw_parallel_abort_##index = false; \ - std::exception_ptr actfw_parallel_exptr_##index; \ - \ - ACTFW_PRAGMA(omp parallel for) \ - for (size_t index = (start); index < (end); ++index) { \ - try { \ - ACTFW_PRAGMA(omp flush(actfw_parallel_break_##index)) \ - if (actfw_parallel_break_##index) continue; \ + // A C++ for loop iteration may either run normally, break, continue, return, + // or throw. Scenarios which do not emit data can be expressed via an enum... + enum class LoopFlow { Normal, Continue, Break }; + + // ...from which one can derive a Boost variant which covers the complete set + // of look iteration outcomes, including early returns and exceptions. + template <typename ReturnT> + using LoopOutcome = boost::variant<LoopFlow, ReturnT, std::exception_ptr>; + + // Tell whether a certain loop iteration outcome requires exiting the loop + template <typename T> + bool + loop_outcome_is_fatal(const LoopOutcome<T>& outcome) + { + // Any outcome which is not representable by LoopFlow is fatal + if (outcome.which() != 0) return true; + + // Among the outcomes representable by LoopFlow, only Break is fatal + return (outcome == LoopOutcome<T>(LoopFlow::Break)); + } + +// This macro wraps a user-provided for loop iteration code into a functor +// which returns a LoopOutcome. It handles all loop iteration outcomes which +// can only be handled by a macro, that is, everything but exceptions. +// +// The RETURN_TYPE parameter indicates the return type of the function inside +// which the underlying for loop is located, and INDEX is the name of the loop +// variable expected by the user's iteration code. +// +// Ideally, RETURN_TYPE should be inferred instead of caller-provider, but +// that does not seem possible in C++14. I think C++17's flavor of auto is +// powerful enough for such inference, so maybe something for the future... +// +#define _ACTFW_WRAPPED_LOOP_ITERATION(RETURN_TYPE, INDEX, ...) \ + /* To produce a functor in a random context, we use a catch-all lambda */ \ + [&](size_t INDEX) -> FW::Details::LoopOutcome<RETURN_TYPE> { \ + /* We set up a fake for loop environment to catch all user actions */ \ + for (int _dummy_##INDEX = 0; _dummy_##INDEX < 2; ++_dummy_##INDEX) { \ + if (_dummy_##INDEX == 0) { \ + /* The user's loop iteration code is pasted here. It may return */ \ + /* or throw an exception, that is handled in higher-level code */ \ __VA_ARGS__ \ - } catch (...) { \ - actfw_parallel_exptr_##index = std::current_exception(); \ - ACTFW_PARALLEL_FOR_BREAK(index) \ + \ + /* If control reaches this point, the loop iteration code */ \ + /* finished normally without continuing, breaking, or returning */ \ + return FW::Details::LoopFlow::Normal; \ + } else { \ + /* If control reaches this point, the loop iteration was skipped */ \ + /* using the "continue" control flow keyword */ \ + return FW::Details::LoopFlow::Continue; \ } \ } \ \ - if (actfw_parallel_abort_##index) return ProcessCode::ABORT; \ - if (actfw_parallel_exptr_##index) { \ - std::rethrow_exception(actfw_parallel_exptr_##index); \ - } \ + /* It control reaches this point, the loop was aborted using the */ \ + /* "break" control flow keyword */ \ + return FW::Details::LoopFlow::Break; \ } -/// Abort an enclosing ACTFW_PARALLEL_FOR construct and return to the -/// enclosing scope. -/// -/// @param index must match the index parameter given for that loop. -/// -#define ACTFW_PARALLEL_FOR_BREAK(index) \ - { \ - actfw_parallel_break_##index = true; \ - ACTFW_PRAGMA(omp flush(actfw_parallel_break_##index)) \ - continue; \ + // Thanks to the loop iteration wrapper above, most of the ACTSFW parallel + // for loop can now be written as real C++ code, instead of a macro. + // + // This function runs a parallel for loop, with loop iterations going from + // "start" to "end", and a per-iteration behavior specified by a functor + // taking the current parallel loop index as a parameter, and returning a loop + // iteration outcome. That functor is expected to be generated via the + // previously defined _ACTFW_WRAPPED_LOOP_ITERATION macro. + // + // If there was an early return from the loop, this function propagates it up. + // Otherwise, it returns an empty boost::optional. + // + template <typename T> + boost::optional<T> + parallel_for_impl(size_t start, + size_t end, + std::function<LoopOutcome<T>(size_t)> iteration) + { + // These control variables are used to tell OpenMP to exit the parallel loop + // early if needed, and to record why we had to do it. + // + // TODO: Once we can assume good OpenMP 4.0 support from the host compiler, + // break out of the loop more efficiently using #pragma omp cancel + // + bool exit_loop_early = false; + LoopOutcome<T> exit_reason = LoopFlow::Normal; + +// Our parallel for loop is implemented using OpenMP +#pragma omp parallel for + for (size_t index = start; index < end; ++index) { +// Skip remaining loop iterations if asked to exit the loop early +#pragma omp flush(exit_loop_early) + if (exit_loop_early) continue; + + // Run this loop iteration and record the outcome, exceptions included + LoopOutcome<T> outcome = LoopFlow::Normal; + try { + outcome = iteration(index); + } catch (...) { + outcome = std::current_exception(); + } + + // Abort the loop if the iteration's outcome states that we should do so + if (loop_outcome_is_fatal(outcome)) { +#pragma omp critical + { + exit_reason = std::move(outcome); + exit_loop_early = true; +#pragma omp flush(exit_loop_early) + } + } + } + + // Analyze the loop termination cause and react accordingly + switch (exit_reason.which()) { + // The loop exited normally or via break, no need to do anything + case 0: + return boost::optional<T>(); + + // The loop was exited due to an early return, propagate it up the stack + case 1: + return boost::get<T>(std::move(exit_reason)); + + // The loop was exited because an exception was thrown. Rethrow it. + case 2: + auto exception = boost::get<std::exception_ptr>(std::move(exit_reason)); + std::rethrow_exception(std::move(exception)); + } } -/// Abort an enclosing ACTFW_PARALLEL_FOR and exit the host function by -/// returning ProcessCode::ABORT. +/// The following macro is the out-of-order multithreaded equivalent of the +/// following serial loop (macro arguments capitalized for emphasis): /// -/// @param index must match the index parameter given for that loop. +/// for(size_t INDEX = START; INDEX < END; ++INDEX) { +/// ... +/// } /// -#define ACTFW_PARALLEL_FOR_ABORT(index) \ - { \ - actfw_parallel_abort_##index = true; \ - ACTFW_PARALLEL_FOR_BREAK(index) \ - } - -// TODO: Once we can assume OpenMP 4.0 support from the host compiler, -// break out of the loop using #pragma omp cancel +/// Unlike raw OpenMP 3.1, we also support breaks, early returns, and +/// exception propagation, to allow for more idiomatic C++ code. On the other +/// hand, due to the way the C preprocessor handles macro arguments, the loop +/// iteration code should not contain any preprocessor directive. +/// +/// Due to limitations of C++14's type inference, this macro may currently +/// only be called in a function which returns FW::ProcessCode. This +/// restriction may be lifted in the future if C++ type inference improves. +/// +#define ACTFW_PARALLEL_FOR(INDEX, START, END, ...) \ + /* This dummy do-while asserts that the macro's output is a statement */ \ + do { \ + /* Execute the parallel for loop */ \ + auto _optional_early_return_##INDEX \ + = FW::Details::parallel_for_impl<FW::ProcessCode>( \ + (START), \ + (END), \ + _ACTFW_WRAPPED_LOOP_ITERATION( \ + FW::ProcessCode, INDEX, __VA_ARGS__)); \ + \ + /* Return early from the host function if asked to do so */ \ + if (_optional_early_return_##INDEX) { \ + return *_optional_early_return_##INDEX; \ + } \ + } while (false); +} +} #endif // ACTFW_CONCURRENCY_PARALLEL_FOR_H diff --git a/Core/src/Framework/Sequencer.cpp b/Core/src/Framework/Sequencer.cpp index 5ed865fabf0495aff98319aa2df2497f4635489d..8bfa20fcc6db909e3ae61cbb69e50875f8251d5e 100644 --- a/Core/src/Framework/Sequencer.cpp +++ b/Core/src/Framework/Sequencer.cpp @@ -147,35 +147,33 @@ FW::Sequencer::run(boost::optional<size_t> events, size_t skip) // Execute the event loop ACTS_INFO("Run the event loop"); - ACTFW_PARALLEL_FOR( - ievent, 0, numEvents, const size_t event = skip + ievent; - ACTS_INFO("start event " << event); - - // Setup the event and algorithm context - WhiteBoard eventStore(Acts::getDefaultLogger( - "EventStore#" + std::to_string(event), m_cfg.eventStoreLogLevel)); - size_t ialg = 0; - - // read everything in - for (auto& rdr - : m_readers) { - if (rdr->read({ialg++, event, eventStore}) != ProcessCode::SUCCESS) - ACTFW_PARALLEL_FOR_ABORT(ievent); - } - // process all algorithms - for (auto& alg - : m_algorithms) { - if (alg->execute({ialg++, event, eventStore}) != ProcessCode::SUCCESS) - ACTFW_PARALLEL_FOR_ABORT(ievent); - } - // write out results - for (auto& wrt - : m_writers) { - if (wrt->write({ialg++, event, eventStore}) != ProcessCode::SUCCESS) - ACTFW_PARALLEL_FOR_ABORT(ievent); - } - - ACTS_INFO("event " << event << " done");) + ACTFW_PARALLEL_FOR(ievent, 0, numEvents, { + const size_t event = skip + ievent; + ACTS_INFO("start event " << event); + + // Setup the event and algorithm context + WhiteBoard eventStore(Acts::getDefaultLogger( + "EventStore#" + std::to_string(event), m_cfg.eventStoreLogLevel)); + size_t ialg = 0; + + // read everything in + for (auto& rdr : m_readers) { + if (rdr->read({ialg++, event, eventStore}) != ProcessCode::SUCCESS) + return ProcessCode::ABORT; + } + // process all algorithms + for (auto& alg : m_algorithms) { + if (alg->execute({ialg++, event, eventStore}) != ProcessCode::SUCCESS) + return ProcessCode::ABORT; + } + // write out results + for (auto& wrt : m_writers) { + if (wrt->write({ialg++, event, eventStore}) != ProcessCode::SUCCESS) + return ProcessCode::ABORT; + } + + ACTS_INFO("event " << event << " done"); + }) // Call endRun() for writers and services ACTS_INFO("Running end-of-run hooks of writers and services");