From f6e6990c47dca7047d70c5964a3fb747b8bfbff1 Mon Sep 17 00:00:00 2001
From: Hadrien Grasland <grasland@lal.in2p3.fr>
Date: Tue, 7 Nov 2017 17:52:27 +0100
Subject: [PATCH] Make ACTFW_PARALLEL_FOR easier to use and to maintain

---
 .../ACTFW/Concurrency/parallel_for.hpp        | 225 +++++++++++++-----
 Core/src/Framework/Sequencer.cpp              |  16 +-
 2 files changed, 174 insertions(+), 67 deletions(-)

diff --git a/Core/include/ACTFW/Concurrency/parallel_for.hpp b/Core/include/ACTFW/Concurrency/parallel_for.hpp
index 32e6d590..9c50b3c0 100644
--- a/Core/include/ACTFW/Concurrency/parallel_for.hpp
+++ b/Core/include/ACTFW/Concurrency/parallel_for.hpp
@@ -19,14 +19,147 @@
 #define ACTFW_CONCURRENCY_PARALLEL_FOR_H 1
 
 #include <exception>
+#include <boost/optional.hpp>
+#include <boost/variant/get.hpp>
+#include <boost/variant/variant.hpp>
 #include "ACTFW/Framework/ProcessCode.hpp"
 
-/// We will use this simple helper around the _Pragma operator of C99
-/// and C++11, if it is not defined elsewhere
-///
-#ifndef ACTFW_PRAGMA
-#define ACTFW_PRAGMA(x) _Pragma(#x)
-#endif
+
+// Let's keep as much as possible inside of namespaces
+namespace FW {
+namespace Details {
+
+
+// A C++ for loop iteration may either run normally, break, continue, return, or
+// throw an exception. Scenarios which do not emit data are handled via an enum.
+enum class LoopFlow { NormalIteration, Continue, Break };
+
+
+// The complete set of possible loop outcomes is handled via a Boost variant,
+// including all LoopFlow and adding support for early returns and exceptions.
+template <typename T>
+using LoopOutcome = boost::variant<LoopFlow, T, std::exception_ptr>;
+
+
+// Tell whether a certain loop iteration outcome requires exiting the loop
+template <typename T>
+bool
+loop_outcome_is_fatal(const LoopOutcome<T>& outcome) {
+  // Any outcome which is not representable by LoopFlow is fatal
+  if(outcome.which() != 0) return true;
+
+  // Among the outcomes representable by LoopFlow, only Break is fatal
+  return (outcome == LoopOutcome<T>(LoopFlow::Break));
+}
+
+
+// This macro wraps a user-provided for loop iteration into a functor which
+// returns a LoopOutcome. It handles all loop iteration outcomes which require
+// a macro to be handled, namely everything but exceptions.
+//
+// The RETURN_TYPE parameter indicates the return type of the function inside
+// which the for loop is located, and the INDEX type is the name of the expected
+// loop variable.
+//
+// I would like the return type of the functor to be inferred, but that does not
+// seem possible in C++14. In C++17, replacing RETURN_TYPE with "auto" in the
+// lambda's return type should be legal and do the expected thing.
+//
+#define _ACTFW_WRAPPED_LOOP_ITERATION(RETURN_TYPE, INDEX, ...)                 \
+  [&](size_t INDEX) -> FW::Details::LoopOutcome<RETURN_TYPE> {                 \
+    /* We set up a fake for loop environment to catch all user actions */      \
+    for(int _dummy_##INDEX = 0; _dummy_##INDEX < 2; ++_dummy_##INDEX) {        \
+      if(_dummy_##INDEX == 0) {                                                \
+        /* The user's loop iteration code is pasted here. It may return */     \
+        /* or throw an exception, that is handled in higher-level C++ code */  \
+        __VA_ARGS__                                                            \
+                                                                               \
+        /* If control reaches this point, the loop iteration code finished */  \
+        /* normally without continuing, breaking, or returning */              \
+        return FW::Details::LoopFlow::NormalIteration;                         \
+      } else {                                                                 \
+        /* If control reaches this point, the loop iteration was skipped */    \
+        /* using the "continue" control flow keyword */                        \
+        return FW::Details::LoopFlow::Continue;                                \
+      }                                                                        \
+    }                                                                          \
+                                                                               \
+    /* It control reaches this point, the loop was aborted using the */        \
+    /* "break" control flow keyword */                                         \
+    return FW::Details::LoopFlow::Break;                                       \
+  }
+
+
+// Thanks to the WRAPPED_LOOP_ITERATION macro, most of the ACTSFW parallel loop
+// wrapper can now be written as real C++ code, rather than macro black magic.
+//
+// This function runs a parallel for loop, with loop iterations going from
+// "start" to "end", and a per-iteration behavior specified by a functor taking
+// the current parallel loop index as a parameter, and returning a loop
+// iteration outcome. The functor is expected to be generated via the
+// _ACTFW_WRAPPED_LOOP_ITERATION macro.
+//
+// If there was an early return from the loop, this function propagates it.
+// Otherwise, it returns an empty boost::optional.
+//
+template <typename T>
+boost::optional<T>
+parallel_for_impl(size_t start,
+                  size_t end,
+                  std::function<LoopOutcome<T>(size_t)> iteration)
+{
+  // These control variables are used to tell OpenMP to exit the parallel loop
+  // early and to record why we had to do it.
+  //
+  // TODO: Once we can assume good OpenMP 4.0 support from the host compiler,
+  //       break out of the loop more efficiently using #pragma omp cancel
+  //
+  bool exit_loop_early = false;
+  LoopOutcome<T> exit_reason = LoopFlow::NormalIteration;
+
+  // Our parallel for loop is implemented using OpenMP
+  #pragma omp parallel for
+  for(size_t index = start; index < end; ++index) {
+    // Skip remaining loop iterations if asked to exit the loop early
+    #pragma omp flush(exit_loop_early)
+    if(exit_loop_early) continue;
+
+    // Run this loop iteration and record the outcome, exceptions included
+    LoopOutcome<T> outcome = LoopFlow::NormalIteration;
+    try {
+      outcome = iteration(index);
+    } catch(...) {
+      outcome = std::current_exception();
+    }
+
+    // Abort the loop if the iteration's outcome states that we should do so
+    if(loop_outcome_is_fatal(outcome)) {
+      #pragma omp critical
+      {
+        exit_reason = std::move(outcome);
+        exit_loop_early = true;
+        #pragma omp flush(exit_loop_early)
+      }
+    }
+  }
+
+  // Analyze the loop termination cause and react accordingly
+  switch(exit_reason.which()) {
+    // The loop exited normally or via break, no need to do anything
+    case 0:
+      return boost::optional<T>();
+
+    // The loop was exited due to an early return, propagate it up the stack
+    case 1:
+      return boost::get<T>(std::move(exit_reason));
+
+    // The loop was exited because an exception was thrown. Rethrow it.
+    case 2:
+      auto exception = boost::get<std::exception_ptr>(std::move(exit_reason));
+      std::rethrow_exception(std::move(exception));
+  }
+}
+
 
 /// Out-of-order multithreaded equivalent of the following serial loop
 /// (macro arguments capitalized for emphasis):
@@ -35,68 +168,36 @@
 ///      ...
 ///   }
 ///
-/// One difference with a serial loop is that you cannot easily abort
-/// the computation using break and return statements. This is a
-/// limitation of the OpenMP 3.1 tasking backend that we currently use.
+/// Unlike raw OpenMP 3.1, we also support breaks, early returns, and exception
+/// propagation, in order to allow for more idiomatic C++ code. On the other
+/// hand, due to the way the C preprocessor handles macro arguments, the loop
+/// iteration code should contain no preprocessor directive.
 ///
-/// Replacements for the common use cases of break and return are
-/// provided in ACTFW_PARALLEL_FOR_BREAK and ACTFW_PARALLEL_FOR_ABORT.
-///
-/// Unlike raw OpenMP 3.1, we also support exception propagation.
-///
-/// One difference with a serial loop is that code which is passed as
-/// an argument to this macro should contain no preprocessor directive.
+/// Due to limitations of C++14's type inference, this macro may only be called
+/// in a function which returns FW::ProcessCode.
 ///
 /// @param index must be unique within the enclosing scope.
 ///
-#define ACTFW_PARALLEL_FOR(index, start, end, ...)                             \
-  {                                                                            \
-    bool               actfw_parallel_break_##index = false;                   \
-    bool               actfw_parallel_abort_##index = false;                   \
-    std::exception_ptr actfw_parallel_exptr_##index;                           \
-                                                                               \
-    ACTFW_PRAGMA(omp parallel for)                                             \
-    for (size_t index = (start); index < (end); ++index) {                     \
-      try {                                                                    \
-        ACTFW_PRAGMA(omp flush(actfw_parallel_break_##index))                  \
-        if (actfw_parallel_break_##index) continue;                            \
-        __VA_ARGS__                                                            \
-      } catch (...) {                                                          \
-        actfw_parallel_exptr_##index = std::current_exception();               \
-        ACTFW_PARALLEL_FOR_BREAK(index)                                        \
-      }                                                                        \
-    }                                                                          \
+#define ACTFW_PARALLEL_FOR(INDEX, START, END, ...)                             \
+  /* This dummy do-while makes sure that the macro's output is a statement */  \
+  do {                                                                         \
+    /* Execute the parallel for loop */                                        \
+    auto optional_early_return =                                               \
+      FW::Details::parallel_for_impl<FW::ProcessCode>(                         \
+        (START),                                                               \
+        (END),                                                                 \
+        _ACTFW_WRAPPED_LOOP_ITERATION(FW::ProcessCode,                         \
+                                      INDEX,                                   \
+                                      __VA_ARGS__)                             \
+      );                                                                       \
                                                                                \
-    if (actfw_parallel_abort_##index) return ProcessCode::ABORT;               \
-    if (actfw_parallel_exptr_##index) {                                        \
-      std::rethrow_exception(actfw_parallel_exptr_##index);                    \
+    /* Return early from the host function if asked to do so */                \
+    if(optional_early_return) {                                                \
+      return *optional_early_return;                                           \
     }                                                                          \
-  }
-
-/// Abort an enclosing ACTFW_PARALLEL_FOR construct and return to the
-/// enclosing scope.
-///
-/// @param index must match the index parameter given for that loop.
-///
-#define ACTFW_PARALLEL_FOR_BREAK(index)                                        \
-  {                                                                            \
-    actfw_parallel_break_##index = true;                                       \
-    ACTFW_PRAGMA(omp flush(actfw_parallel_break_##index))                      \
-    continue;                                                                  \
-  }
-
-/// Abort an enclosing ACTFW_PARALLEL_FOR and exit the host function by
-/// returning ProcessCode::ABORT.
-///
-/// @param index must match the index parameter given for that loop.
-///
-#define ACTFW_PARALLEL_FOR_ABORT(index)                                        \
-  {                                                                            \
-    actfw_parallel_abort_##index = true;                                       \
-    ACTFW_PARALLEL_FOR_BREAK(index)                                            \
-  }
+  } while(false);
 
-// TODO: Once we can assume OpenMP 4.0 support from the host compiler,
-//       break out of the loop using #pragma omp cancel
+}
+}
 
 #endif  // ACTFW_CONCURRENCY_PARALLEL_FOR_H
diff --git a/Core/src/Framework/Sequencer.cpp b/Core/src/Framework/Sequencer.cpp
index 5ed865fa..7f531c5a 100644
--- a/Core/src/Framework/Sequencer.cpp
+++ b/Core/src/Framework/Sequencer.cpp
@@ -148,7 +148,11 @@ FW::Sequencer::run(boost::optional<size_t> events, size_t skip)
   // Execute the event loop
   ACTS_INFO("Run the event loop");
   ACTFW_PARALLEL_FOR(
-      ievent, 0, numEvents, const size_t event = skip + ievent;
+    ievent,
+    0,
+    numEvents,
+    {
+      const size_t event = skip + ievent;
       ACTS_INFO("start event " << event);
 
       // Setup the event and algorithm context
@@ -160,22 +164,24 @@ FW::Sequencer::run(boost::optional<size_t> events, size_t skip)
       for (auto& rdr
            : m_readers) {
         if (rdr->read({ialg++, event, eventStore}) != ProcessCode::SUCCESS)
-          ACTFW_PARALLEL_FOR_ABORT(ievent);
+          return ProcessCode::ABORT;
       }
       // process all algorithms
       for (auto& alg
            : m_algorithms) {
         if (alg->execute({ialg++, event, eventStore}) != ProcessCode::SUCCESS)
-          ACTFW_PARALLEL_FOR_ABORT(ievent);
+          return ProcessCode::ABORT;
       }
       // write out results
       for (auto& wrt
            : m_writers) {
         if (wrt->write({ialg++, event, eventStore}) != ProcessCode::SUCCESS)
-          ACTFW_PARALLEL_FOR_ABORT(ievent);
+          return ProcessCode::ABORT;
       }
 
-      ACTS_INFO("event " << event << " done");)
+      ACTS_INFO("event " << event << " done");
+    }
+  )
 
   // Call endRun() for writers and services
   ACTS_INFO("Running end-of-run hooks of writers and services");
-- 
GitLab