Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
acts-framework
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
This is an archived project. Repository and other project resources are read-only.
Show more breadcrumbs
Gabriel Farrugia
acts-framework
Commits
aba656b1
Commit
aba656b1
authored
7 years ago
by
Hadrien G
Committed by
Hadrien Benjamin Grasland
7 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Please clang-format and clean up docs along the way
parent
f6e6990c
No related branches found
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
Core/include/ACTFW/Concurrency/parallel_for.hpp
+145
-153
145 additions, 153 deletions
Core/include/ACTFW/Concurrency/parallel_for.hpp
Core/src/Framework/Sequencer.cpp
+26
-34
26 additions, 34 deletions
Core/src/Framework/Sequencer.cpp
with
171 additions
and
187 deletions
Core/include/ACTFW/Concurrency/parallel_for.hpp
+
145
−
153
View file @
aba656b1
...
...
@@ -18,184 +18,176 @@
#ifndef ACTFW_CONCURRENCY_PARALLEL_FOR_H
#define ACTFW_CONCURRENCY_PARALLEL_FOR_H 1
#include
<exception>
#include
<boost/optional.hpp>
#include
<boost/variant/get.hpp>
#include
<boost/variant/variant.hpp>
#include
<exception>
#include
"ACTFW/Framework/ProcessCode.hpp"
// Let's keep as much as possible inside of namespaces
namespace
FW
{
namespace
Details
{
// A C++ for loop iteration may either run normally, break, continue, return,
// or throw. Scenarios which do not emit data can be expressed via an enum...
enum
class
LoopFlow
{
NormalIteration
,
Continue
,
Break
};
// ...from which one can derive a Boost variant which covers the complete set
// of look iteration outcomes, including early returns and exceptions.
template
<
typename
ReturnT
>
using
LoopOutcome
=
boost
::
variant
<
LoopFlow
,
ReturnT
,
std
::
exception_ptr
>
;
// Tell whether a certain loop iteration outcome requires exiting the loop
template
<
typename
T
>
bool
loop_outcome_is_fatal
(
const
LoopOutcome
<
T
>&
outcome
)
{
// Any outcome which is not representable by LoopFlow is fatal
if
(
outcome
.
which
()
!=
0
)
return
true
;
// Among the outcomes representable by LoopFlow, only Break is fatal
return
(
outcome
==
LoopOutcome
<
T
>
(
LoopFlow
::
Break
));
}
// A C++ for loop iteration may either run normally, break, continue, return, or
// throw an exception. Scenarios which do not emit data are handled via an enum.
enum
class
LoopFlow
{
NormalIteration
,
Continue
,
Break
};
// The complete set of possible loop outcomes is handled via a Boost variant,
// including all LoopFlow and adding support for early returns and exceptions.
template
<
typename
T
>
using
LoopOutcome
=
boost
::
variant
<
LoopFlow
,
T
,
std
::
exception_ptr
>
;
// Tell whether a certain loop iteration outcome requires exiting the loop
template
<
typename
T
>
bool
loop_outcome_is_fatal
(
const
LoopOutcome
<
T
>&
outcome
)
{
// Any outcome which is not representable by LoopFlow is fatal
if
(
outcome
.
which
()
!=
0
)
return
true
;
// Among the outcomes representable by LoopFlow, only Break is fatal
return
(
outcome
==
LoopOutcome
<
T
>
(
LoopFlow
::
Break
));
}
// This macro wraps a user-provided for loop iteration into a functor which
// returns a LoopOutcome. It handles all loop iteration outcomes which require
// a macro to be handled, namely everything but exceptions.
//
// The RETURN_TYPE parameter indicates the return type of the function inside
// which the for loop is located, and the INDEX type is the name of the expected
// loop variable.
//
// I would like the return type of the functor to be inferred, but that does not
// seem possible in C++14. In C++17, replacing RETURN_TYPE with "auto" in the
// lambda's return type should be legal and do the expected thing.
//
#define _ACTFW_WRAPPED_LOOP_ITERATION(RETURN_TYPE, INDEX, ...) \
[&](size_t INDEX) -> FW::Details::LoopOutcome<RETURN_TYPE> { \
/* We set up a fake for loop environment to catch all user actions */
\
for(int _dummy_##INDEX = 0; _dummy_##INDEX < 2; ++_dummy_##INDEX) { \
if(_dummy_##INDEX == 0) { \
/* The user's loop iteration code is pasted here. It may return */
\
/* or throw an exception, that is handled in higher-level C++ code */
\
__VA_ARGS__ \
// This macro wraps a user-provided for loop iteration code into a functor
// which returns a LoopOutcome. It handles all loop iteration outcomes which
// can only be handled by a macro, that is, everything but exceptions.
//
// The RETURN_TYPE parameter indicates the return type of the function inside
// which the underlying for loop is located, and INDEX is the name of the loop
// variable expected by the user's iteration code.
//
// Ideally, RETURN_TYPE should be inferred instead of caller-provider, but
// that does not seem possible in C++14. I think C++17's flavor of auto is
// powerful enough for such inference, so maybe something for the future...
//
#define _ACTFW_WRAPPED_LOOP_ITERATION(RETURN_TYPE, INDEX, ...) \
/* To produce a functor in a random context, we use a catch-all lambda */
\
[&](size_t INDEX) -> FW::Details::LoopOutcome<RETURN_TYPE> { \
/* We set up a fake for loop environment to catch all user actions */
\
for (int _dummy_##INDEX = 0; _dummy_##INDEX < 2; ++_dummy_##INDEX) { \
if (_dummy_##INDEX == 0) { \
/* The user's loop iteration code is pasted here. It may return */
\
/* or throw an exception, that is handled in higher-level code */
\
__VA_ARGS__ \
\
/* If control reaches this point, the loop iteration code finished */
\
/* normally without continuing, breaking, or returning */
\
return FW::Details::LoopFlow::NormalIteration; \
} else { \
/* If control reaches this point, the loop iteration was skipped */
\
/* using the "continue" control flow keyword */
\
return FW::Details::LoopFlow::Continue; \
/* If control reaches this point, the loop iteration code */
\
/* finished normally without continuing, breaking, or returning */
\
return FW::Details::LoopFlow::NormalIteration; \
} else { \
/* If control reaches this point, the loop iteration was skipped */
\
/* using the "continue" control flow keyword */
\
return FW::Details::LoopFlow::Continue; \
} \
} \
} \
\
/* It control reaches this point, the loop was aborted using the */
\
/* "break" control flow keyword */
\
return FW::Details::LoopFlow::Break; \
}
/* It control reaches this point, the loop was aborted using the */
\
/* "break" control flow keyword */
\
return FW::Details::LoopFlow::Break; \
}
// Thanks to the WRAPPED_LOOP_ITERATION macro, most of the ACTSFW parallel loop
// wrapper can now be written as real C++ code, rather than macro black magic.
//
// This function runs a parallel for loop, with loop iterations going from
// "start" to "end", and a per-iteration behavior specified by a functor taking
// the current parallel loop index as a parameter, and returning a loop
// iteration outcome. The functor is expected to be generated via the
// _ACTFW_WRAPPED_LOOP_ITERATION macro.
//
// If there was an early return from the loop, this function propagates it.
// Otherwise, it returns an empty boost::optional.
//
template
<
typename
T
>
boost
::
optional
<
T
>
parallel_for_impl
(
size_t
start
,
size_t
end
,
std
::
function
<
LoopOutcome
<
T
>
(
size_t
)
>
iteration
)
{
// These control variables are used to tell OpenMP to exit the parallel loop
// early and to record why we had to do it.
// Thanks to the loop iteration wrapper above, most of the ACTSFW parallel
// for loop can now be written as real C++ code, instead of a macro.
//
// TODO: Once we can assume good OpenMP 4.0 support from the host compiler,
// break out of the loop more efficiently using #pragma omp cancel
// This function runs a parallel for loop, with loop iterations going from
// "start" to "end", and a per-iteration behavior specified by a functor
// taking the current parallel loop index as a parameter, and returning a loop
// iteration outcome. That functor is expected to be generated via the
// previously defined _ACTFW_WRAPPED_LOOP_ITERATION macro.
//
bool
exit_loop_early
=
false
;
LoopOutcome
<
T
>
exit_reason
=
LoopFlow
::
NormalIteration
;
// Our parallel for loop is implemented using OpenMP
#pragma omp parallel for
for
(
size_t
index
=
start
;
index
<
end
;
++
index
)
{
// Skip remaining loop iterations if asked to exit the loop early
#pragma omp flush(exit_loop_early)
if
(
exit_loop_early
)
continue
;
// Run this loop iteration and record the outcome, exceptions included
LoopOutcome
<
T
>
outcome
=
LoopFlow
::
NormalIteration
;
try
{
outcome
=
iteration
(
index
);
}
catch
(...)
{
outcome
=
std
::
current_exception
();
}
// If there was an early return from the loop, this function propagates it up.
// Otherwise, it returns an empty boost::optional.
//
template
<
typename
T
>
boost
::
optional
<
T
>
parallel_for_impl
(
size_t
start
,
size_t
end
,
std
::
function
<
LoopOutcome
<
T
>
(
size_t
)
>
iteration
)
{
// These control variables are used to tell OpenMP to exit the parallel loop
// early if needed, and to record why we had to do it.
//
// TODO: Once we can assume good OpenMP 4.0 support from the host compiler,
// break out of the loop more efficiently using #pragma omp cancel
//
bool
exit_loop_early
=
false
;
LoopOutcome
<
T
>
exit_reason
=
LoopFlow
::
NormalIteration
;
// Our parallel for loop is implemented using OpenMP
#pragma omp parallel for
for
(
size_t
index
=
start
;
index
<
end
;
++
index
)
{
// Skip remaining loop iterations if asked to exit the loop early
#pragma omp flush(exit_loop_early)
if
(
exit_loop_early
)
continue
;
// Run this loop iteration and record the outcome, exceptions included
LoopOutcome
<
T
>
outcome
=
LoopFlow
::
NormalIteration
;
try
{
outcome
=
iteration
(
index
);
}
catch
(...)
{
outcome
=
std
::
current_exception
();
}
// Abort the loop if the iteration's outcome states that we should do so
if
(
loop_outcome_is_fatal
(
outcome
))
{
#pragma omp critical
{
exit_reason
=
std
::
move
(
outcome
);
exit_loop_early
=
true
;
#pragma omp flush(exit_loop_early)
// Abort the loop if the iteration's outcome states that we should do so
if
(
loop_outcome_is_fatal
(
outcome
))
{
#pragma omp critical
{
exit_reason
=
std
::
move
(
outcome
);
exit_loop_early
=
true
;
#pragma omp flush(exit_loop_early)
}
}
}
}
// Analyze the loop termination cause and react accordingly
switch
(
exit_reason
.
which
())
{
// The loop exited normally or via break, no need to do anything
case
0
:
return
boost
::
optional
<
T
>
();
// Analyze the loop termination cause and react accordingly
switch
(
exit_reason
.
which
())
{
// The loop exited normally or via break, no need to do anything
case
0
:
return
boost
::
optional
<
T
>
();
// The loop was exited due to an early return, propagate it up the stack
case
1
:
return
boost
::
get
<
T
>
(
std
::
move
(
exit_reason
));
// The loop was exited due to an early return, propagate it up the stack
case
1
:
return
boost
::
get
<
T
>
(
std
::
move
(
exit_reason
));
// The loop was exited because an exception was thrown. Rethrow it.
case
2
:
auto
exception
=
boost
::
get
<
std
::
exception_ptr
>
(
std
::
move
(
exit_reason
));
std
::
rethrow_exception
(
std
::
move
(
exception
));
// The loop was exited because an exception was thrown. Rethrow it.
case
2
:
auto
exception
=
boost
::
get
<
std
::
exception_ptr
>
(
std
::
move
(
exit_reason
));
std
::
rethrow_exception
(
std
::
move
(
exception
));
}
}
}
/// Out-of-order multithreaded equivalent of the following serial loop
/// (macro arguments capitalized for emphasis):
///
/// for(size_t INDEX = START; index < END; ++index) {
/// ...
/// }
///
/// Unlike raw OpenMP 3.1, we also support breaks, early returns, and exception
/// propagation, in order to allow for more idiomatic C++ code. On the other
/// hand, due to the way the C preprocessor handles macro arguments, the loop
/// iteration code should contain no preprocessor directive.
///
/// Due to limitations of C++14's type inference, this macro may only be called
/// in a function which returns FW::ProcessCode.
///
/// @param index must be unique within the enclosing scope.
///
#define ACTFW_PARALLEL_FOR(INDEX, START, END, ...) \
/* This dummy do-while makes sure that the macro's output is a statement */
\
do { \
/* Execute the parallel for loop */
\
auto optional_early_return = \
FW::Details::parallel_for_impl<FW::ProcessCode>( \
(START), \
(END), \
_ACTFW_WRAPPED_LOOP_ITERATION(FW::ProcessCode, \
INDEX, \
__VA_ARGS__) \
); \
/// The following macro is the out-of-order multithreaded equivalent of the
/// following serial loop (macro arguments capitalized for emphasis):
///
/// for(size_t INDEX = START; index < END; ++index) {
/// ...
/// }
///
/// Unlike raw OpenMP 3.1, we also support breaks, early returns, and
/// exception propagation, to allow for more idiomatic C++ code. On the other
/// hand, due to the way the C preprocessor handles macro arguments, the loop
/// iteration code should not contain any preprocessor directive.
///
/// Due to limitations of C++14's type inference, this macro may currently
/// only be called in a function which returns FW::ProcessCode. This
/// restriction may be lifted in the future if C++ type inference improves.
///
#define ACTFW_PARALLEL_FOR(INDEX, START, END, ...) \
/* This dummy do-while asserts that the macro's output is a statement */
\
do { \
/* Execute the parallel for loop */
\
auto optional_early_return \
= FW::Details::parallel_for_impl<FW::ProcessCode>( \
(START), \
(END), \
_ACTFW_WRAPPED_LOOP_ITERATION( \
FW::ProcessCode, INDEX, __VA_ARGS__)); \
\
/* Return early from the host function if asked to do so */
\
if(optional_early_return) {
\
return *optional_early_return;
\
}
\
} while(false);
/* Return early from the host function if asked to do so */
\
if
(optional_early_return) { \
return *optional_early_return; \
}
\
} while
(false);
}
}
...
...
This diff is collapsed.
Click to expand it.
Core/src/Framework/Sequencer.cpp
+
26
−
34
View file @
aba656b1
...
...
@@ -147,41 +147,33 @@ FW::Sequencer::run(boost::optional<size_t> events, size_t skip)
// Execute the event loop
ACTS_INFO
(
"Run the event loop"
);
ACTFW_PARALLEL_FOR
(
ievent
,
0
,
numEvents
,
{
const
size_t
event
=
skip
+
ievent
;
ACTS_INFO
(
"start event "
<<
event
);
// Setup the event and algorithm context
WhiteBoard
eventStore
(
Acts
::
getDefaultLogger
(
"EventStore#"
+
std
::
to_string
(
event
),
m_cfg
.
eventStoreLogLevel
));
size_t
ialg
=
0
;
// read everything in
for
(
auto
&
rdr
:
m_readers
)
{
if
(
rdr
->
read
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
// process all algorithms
for
(
auto
&
alg
:
m_algorithms
)
{
if
(
alg
->
execute
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
// write out results
for
(
auto
&
wrt
:
m_writers
)
{
if
(
wrt
->
write
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
ACTS_INFO
(
"event "
<<
event
<<
" done"
);
ACTFW_PARALLEL_FOR
(
ievent
,
0
,
numEvents
,
{
const
size_t
event
=
skip
+
ievent
;
ACTS_INFO
(
"start event "
<<
event
);
// Setup the event and algorithm context
WhiteBoard
eventStore
(
Acts
::
getDefaultLogger
(
"EventStore#"
+
std
::
to_string
(
event
),
m_cfg
.
eventStoreLogLevel
));
size_t
ialg
=
0
;
// read everything in
for
(
auto
&
rdr
:
m_readers
)
{
if
(
rdr
->
read
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
)
// process all algorithms
for
(
auto
&
alg
:
m_algorithms
)
{
if
(
alg
->
execute
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
// write out results
for
(
auto
&
wrt
:
m_writers
)
{
if
(
wrt
->
write
({
ialg
++
,
event
,
eventStore
})
!=
ProcessCode
::
SUCCESS
)
return
ProcessCode
::
ABORT
;
}
ACTS_INFO
(
"event "
<<
event
<<
" done"
);
})
// Call endRun() for writers and services
ACTS_INFO
(
"Running end-of-run hooks of writers and services"
);
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment