Commit 21fb4450 authored by Illya Shapoval's avatar Illya Shapoval

migrate SerialTaskQueue from tbb::task to tbb::task_group API

parent 4f837e56
Pipeline #1651817 passed with stages
in 28 minutes and 35 seconds
......@@ -31,6 +31,8 @@ class TBBMessageSvc : public MessageSvc {
public:
using MessageSvc::MessageSvc;
~TBBMessageSvc() noexcept override {}
using MessageSvc::reportMessage;
/// Implementation of IMessageSvc::reportMessage()
......
......@@ -22,7 +22,7 @@
#include <memory>
#include <tbb/concurrent_queue.h>
#include <tbb/task.h>
#include <tbb/task_group.h>
namespace Gaudi {
......@@ -66,66 +66,18 @@ namespace Gaudi {
/// Block until all the currently enqueued tasks are completed.
/// @note while a thread is waiting for the tasks to be completes, another
/// may still enqueue other tasks.
void wait() const;
void wait();
private:
/// Wrapper for the WorkItem class for internal concurrency bookkeeping.
class SerialWorkItem {
public:
/// Initialize the instance from the WorkiItem and the SerialTaskQueue
/// (for synchronization).
/// @note the object takes ownership of the WorkItem pointer.
SerialWorkItem( WorkItem* item, SerialTaskQueue* serializer ) : m_item( item ), m_serializer( serializer ) {}
/// Execute the WorkItem and notify the SerialTaskQueue of the completion.
void run();
private:
/// Pointer to the WorkItem to run.
std::unique_ptr<WorkItem> m_item;
/// Pointer to the SerialTaskQueue used for the synchronization.
SerialTaskQueue* m_serializer;
};
/// Helper class to wrap a SerialWorkItem in a tbb::task.
class SerialWorkItemRunner : public tbb::task {
public:
/// Initialize the instance.
SerialWorkItemRunner( SerialWorkItem* item ) : m_item( item ) {}
/// Call the run method of the work item.
tbb::task* execute() override {
m_item->run();
return NULL;
}
private:
/// Pointer to the work item to be executed.
/// @note we do not own the work item (it deletes itself)
SerialWorkItem* m_item;
};
void i_startNextItem();
/// Counter of the currently running tasks.
std::atomic<int> m_count;
/// Queue of the tasks to be executed.
tbb::concurrent_queue<SerialWorkItem*> m_queue;
tbb::concurrent_queue<std::unique_ptr<WorkItem>> m_queue;
/// TBB task group to submit work items to.
tbb::task_group m_taskGroup;
};
inline void SerialTaskQueue::SerialWorkItem::run() {
// run the wrapped task
m_item->run();
// We need to keep the pointer on the stack because we are going to delete
// ourselves.
SerialTaskQueue* serializer = m_serializer;
// We call the delete before returning the control to the serialized so that
// possible complex code in the task destructor is executed serially.
delete this;
// Notify the queue of the completion, so that it can schedule the next task.
serializer->noteCompletion();
}
} /* namespace Gaudi */
#endif /* SERIALTASKQUEUE_H_ */
......@@ -26,7 +26,7 @@ namespace Gaudi {
SerialTaskQueue::WorkItem::~WorkItem() {}
void SerialTaskQueue::add( WorkItem* item ) {
m_queue.push( new SerialWorkItem( item, this ) );
m_queue.push( std::unique_ptr<WorkItem>( item ) );
if ( ++m_count == 1 ) i_startNextItem();
}
......@@ -34,15 +34,22 @@ namespace Gaudi {
if ( --m_count != 0 ) i_startNextItem();
}
void SerialTaskQueue::wait() const {
void SerialTaskQueue::wait() {
// wait until the queue is empty and all the tasks are completed
while ( ( !m_queue.empty() ) || m_count ) {}
// ensure there are no items in the TBB task group
m_taskGroup.wait();
}
void SerialTaskQueue::i_startNextItem() {
SerialWorkItem* item = 0;
std::unique_ptr<WorkItem> item;
m_queue.try_pop( item );
tbb::task::enqueue( *new ( tbb::task::allocate_root() ) SerialWorkItemRunner( item ) );
m_taskGroup.run( [this, _item = std::move( item )]() {
// run the task
_item->run();
// Notify the queue of the completion, so that it can schedule the next task.
this->noteCompletion();
} );
}
} /* namespace Gaudi */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment