Skip to content
Snippets Groups Projects
Commit de692560 authored by Marco Clemencic's avatar Marco Clemencic
Browse files

Drop unused GaudiMTTools

parent 323cbc1d
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 2202 deletions
gaudi_subdir(GaudiMTTools)
gaudi_depends_on_subdirs(GaudiAlg)
find_package(Boost REQUIRED COMPONENTS thread system)
find_package(TBB)
find_package(AIDA)
# Without AIDA available, this package can't function.
if( NOT AIDA_FOUND )
return()
endif()
# Hide some TBB and Boost compile time warnings
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${TBB_INCLUDE_DIRS})
gaudi_add_module(GaudiMTTools *.cpp
LINK_LIBRARIES GaudiAlgLib Boost TBB
INCLUDE_DIRS Boost TBB)
!-----------------------------------------------------------------------------
! Package : GaudiMTTools
! Responsible : Illya SHAPOVAL
! Purpose : The package for Gaudi multithreading tools and machinery
!-----------------------------------------------------------------------------
================================ Gaudi v28r3 =================================
! 2017-07-18 - Sebastien Ponce (commit f96f51c0)
- Merge branch 'ignoreBoostWarnings' into 'master'
Ignore some warnings coming from the Boost library
See merge request !354
================================ Gaudi v28r2 =================================
! 2017-02-08 - Charles Leggett (commit 09859bd)
- add sysExecute(EventContext) and extract errorCount from Algorithm base class
We need to extract the `m_errorCount` variable from the Algorithm base class,
as it's not important on a per-instance basis, but rather on a job level.
This is moved to the AlgExecStateSvc.
This merge request also adds an explicit `IAlgorithm::sysExecute(const
EventContext&)`, which should become the standard entry point to
`Algorithm::execute`. This sets the local m_event_context variable, so
derived classes can get quick access to the current context instead of going
through the slower thead local `Gaudi::Hive::currentContext()`.
Note that `IAlgorithm::sysExecute()` has been removed, to avoid "overloaded
virtual" compiler errors, and should be replaced in all clients with
`IAlgorithm::sysExecute(EventContext)`, or if the context is not immediately
available, with `IAlgorithm::sysExecute(Gaudi::Hive::currentContext())`.
All this is being done in preparation to re-introduce the ReEntrantAlgorithm
class from merge !177 (see also !274)
See merge request !273
================================ Gaudi v28r1 =================================
! 2016-12-14 - Attila Krasznahorkay (commit a42ff2a)
- CMake configuration changes to build Gaudi with optional externals missing
In the last days I experimented a bit with teaching Gaudi how to build when
only ROOT, Boost and TBB are available to it. This is the result. As far as I
can tell the code builds in the same way as the current master branch when
all possible externals are available. (All the ones available in a full ATLAS
offline build at least.) And I'm also able to build the project in these
modes:
* On Ubuntu 16.04 with the system provided Boost and TBB versions, against
a privately built ROOT 6 version.
* On macOS Sierra against privately built Boost, ROOT and TBB versions.
Both when building the project completely on its own, and when building it
against an ATLAS externals project.
Some notes:
* The code uses C++14 expressions in quite a few places by now. So I raised
the default C++ standard to C++14. This is the default in ATLAS builds since
a while, I only saw this as an issue when building Gaudi "on its own" with a
very minimal configuration command.
* While the code advertises that it still has support for ROOT 5, it's not
able to build against it since a wile. Since the updates I put in for TLS
variables on macOS. (The macros used there are ROOT 6 only.) And this update
makes things even worse. As the code now relies on not providing the
definition of some classes to the dictionary generator in GaudiPython that
are not available. (Because AIDA or XercesC is missing.) While ROOT 6 handles
this with just a build warning, ROOT 5's genreflex treats this with an ERROR
message. That I was unable to get rid of.
So, extended testing is clearly necessary, the configuration code could
definitely be fine tuned as I probably turned off the build of more source
files than absolutely necessary, but I think we should move ahead with such a
configuration organisation.
See merge request !241
================================ Gaudi v28r0 =================================
! 2016-10-27 - Gerhard Raven, Marco Clemencic (commit b111898)
- modernization of Property
This is a major rewrite of the system of `Property` classes. Started as the
implementation of GAUDI-1214, it continued as a review of
`IProperty` and `PropertyMgr`, to end up in a complete rewrite
(and optimization) of `PropertyWithValue`.
* Fixes GAUDI-1214
- added missing `declareProperty` signature
- added `PropertyHolder` (an updated `PropertyMgr`)
- adapted all properties in Gaudi
* improved use of `PropertyHolder`
- use inheritance instead of composition
- removed ~200 (duplicated) lines of code in GaudiKernel
* optimization of Property (fixes GAUDI-1229)
- use templates and automatic code generation to handle efficiently
both value and reference properties
- avoid creation of values on the heap (see GAUDI-1229)
- removed the *owned* boolean flag
- improved usability of properties (e.g. begin/end wrappers for
C++11 loops, increment/decrement operators...)
- deprecated use of C arrays as properties
- merged features of `PropertyWithVerifier`, `SimpleProperty` and
`SimplePropertyRef` into `PropertyWithValue`
- deduplication of name and doc strings via
[`boost::string_ref`](http://www.boost.org/doc/libs/1_61_0/libs/utility/doc/html/string_ref.html)
and a static storage (see GAUDI-1229)
* Fixes GAUDI-1205
* add deprecation warnings in service accessors of `Algorithm` (@graven)
* renamed `Property` base class to `Gaudi::Details::PropertyBase` and
`PropertyWithValue` to `Gaudi::Property`
- added backward compatibility type aliases (but not compatible with
forward declarations of `class Property`, which should be replaced
by `#include "GaudiKernel/PropertyFwd.h"`)
* added macro `GAUDI_PROPERTY_v2` to allow easy implementation of
backward compatible changes in derived projects (e.g. when user code
relied on `DoubleProperty` having a verifier)
* Fixes GAUDI-1268
The changes are as backward compatible as much as possible (except if you
explicitly inherit from `SimpleProperty`, or you forward declared
`class Property`, which now are typedefs), but must be validated in the
experiment frameworks.
See merge request !182
! 2016-10-24 - Marco Clemencic (commit a8d6605)
- hidden/fixed "missing override" warnings exposed after !192
- warnings from external headers are hidden declaring the include directories
as `-system`
- warnings from a couple of new files have been fixed
- warnings in `HistogramSvc` are hidden because unavoidable (see f83c3d8e)
- warnings related to CLHEP-136 have been hidden with a special trick (see
0a238135)
See merge request !205
! 2016-10-12 - Marco Clemencic (commit b5e05a1)
- improved handling of package version in CMake configuration
use project version if a package version is not provided
See GAUDI-1215.
See merge request !175
! 2016-10-01 - Marco Clemencic (commit 99b1001)
- enable missing override warnings (gcc >= 5.1)
Fixes GAUDI-1241
See merge request !192
! 2016-07-25 - Marco Clemencic (commit e3d4b07)
- remove CMT configuration and related files
* removed CMT configuration files
* adapted scripts not to use CMT
Fixes GAUDI-1216 Fixes GAUDI-979
See merge request !186
! 2016-05-18 - Sami Kama (commit 9eeef96)
- allow build of Gaudi without LCG and fix ThreadPoolSvc
Allow building of Gaudi with local externals.
It also contains a fix to an issue introduced with merge request !160
(GAUDI-1212).
Fixes GAUDI-1211, GAUDI-1212.
See merge request !166
! 2016-04-07 - Hadrien Grasland (commit 0a4087c)
- Explicitly mark the destructor of GaudiParallelizer as noexcept(true)
Fixes GAUDI-1187, Gaudi failing to build against recent releases of Intel
TBB.
See merge request !132
============================= GaudiMTTools v1r2 ==============================
! 2016-02-11 - commit e2c585c
- Remove globalTimeOffset getter and GlobalTimeOffset property from GaudiCommon
GaudiCommon adds a property GlobalTimeOffset and a corresponding getter
globalTimeOffset to every algorithm/tool which uses GaudiCommon, regardless
on whether this property is used.
As there is no code anymore that actually uses globalTimeOffset, this patch
removes this accessor and the corresponding property.
Fixes GAUDI-1122.
See merge request !61
============================= GaudiMTTools v1r1 ==============================
! 2016-01-12 - commit 4dbdc8e
- fixed clang 3.7 compilation warnings
Fixes GAUDI-1083.
See merge request !89
! 2015-11-02 - commit 57f356c
- Merge branch 'hive' into 'master'
Fixes GAUDI-978.
See merge request !65
!======================= GaudiMTTools v1r0 2012-01-27 ============================
! 2012-01-27 - Illya Shapoval
- Requirements file is fixed: explicit Boost dependecy and linking option is added.
! 2012-01-26 - Illya Shapoval
- GaudiParallelizer prototype is created. This is an algorithm manager similar
to GaudiSequencer with the difference in that it runs algorithms in parallel
for each event. Algorithms being executed in parallel have to be independent
at this moment (i.e. no input/output data dependencies). Internally
GaudiParallelizer uses Threadpool 0.2.5 library based on Boost to manage
thread and task pools.
// Include files
// From Gaudi
#include "GaudiAlg/ISequencerTimerTool.h"
#include "GaudiKernel/IAlgManager.h"
#include "GaudiKernel/IJobOptionsSvc.h"
#include "GaudiKernel/SerializeSTL.h"
#include "GaudiParallelizer.h"
// ----------------------------------------------------------------------------
// Implementation file for class: GaudiParallelizer
//
// 09/12/2011: Illya Shapoval
// ----------------------------------------------------------------------------
DECLARE_ALGORITHM_FACTORY( GaudiParallelizer )
// ============================================================================
// Standard constructor, initializes variables
// ============================================================================
GaudiParallelizer::GaudiParallelizer( const std::string& name, ISvcLocator* pSvcLocator )
: GaudiAlgorithm( name, pSvcLocator )
{
m_names.declareUpdateHandler( &GaudiParallelizer::membershipHandler, this );
}
// ============================================================================
// Initialization
// ============================================================================
StatusCode GaudiParallelizer::initialize()
{
if ( msgLevel( MSG::DEBUG ) ) debug() << "==> Initialize" << endmsg;
StatusCode sc = GaudiAlgorithm::initialize(); // must be executed first
if ( sc.isFailure() ) return sc; // error printed already by GaudiAlgorithm
StatusCode status = decodeNames();
if ( !status.isSuccess() ) return status;
m_timerTool = tool<ISequencerTimerTool>( "SequencerTimerTool" );
if ( m_timerTool->globalTiming() ) m_measureTime = true;
if ( m_measureTime ) {
m_timer = m_timerTool->addTimer( name() );
m_timerTool->increaseIndent();
} else {
release( m_timerTool );
m_timerTool = 0;
}
//== Initialize the algorithms
std::vector<AlgorithmEntry>::iterator itE;
for ( itE = m_entries.begin(); m_entries.end() != itE; itE++ ) {
Algorithm* myAlg = itE->algorithm();
if ( m_measureTime ) {
itE->setTimer( m_timerTool->addTimer( myAlg->name() ) );
}
status = myAlg->sysInitialize();
if ( !status.isSuccess() ) {
return Error( "Can not initialize " + myAlg->name(), status );
}
}
if ( m_measureTime ) m_timerTool->decreaseIndent();
if ( m_nthreads != 0 ) {
// Construct the TBB task scheduler with m_nthreads threads
tbb::task_scheduler_init init( m_nthreads );
} else {
m_nthreads = tbb::task_scheduler_init::default_num_threads();
}
if ( msgLevel( MSG::DEBUG ) )
debug() << "Number of threads set to be used in the TBB thread pool is " << m_nthreads << endmsg;
return StatusCode::SUCCESS;
}
// ============================================================================
// Main execution
// ============================================================================
StatusCode GaudiParallelizer::execute()
{
if ( m_measureTime ) m_timerTool->start( m_timer );
if ( msgLevel( MSG::DEBUG ) ) debug() << "==> Execute algorithms in parallel" << endmsg;
for ( std::vector<AlgorithmEntry>::iterator itE = m_entries.begin(); m_entries.end() != itE; ++itE ) {
Algorithm* myAlg = itE->algorithm();
if ( !myAlg->isEnabled() ) continue;
if ( !myAlg->isExecuted() ) {
m_task_group.run( boost::bind( &AlgorithmEntry::run, boost::ref( *itE ), boost::ref( *this ) ) );
}
}
m_task_group.wait();
if ( msgLevel( MSG::DEBUG ) ) debug() << "==> Joining parallel algorithm tasks" << endmsg;
for ( std::vector<AlgorithmEntry>::const_iterator it = m_entries.begin(); it != m_entries.end(); ++it ) {
if ( msgLevel( MSG::DEBUG ) )
debug() << "Algorithm wrapper " << &*it << " around the algorithm " << it->algorithm()->name()
<< " received return status code " << it->m_returncode << endmsg;
}
for ( std::vector<AlgorithmEntry>::const_iterator it = m_entries.begin(); it != m_entries.end(); ++it )
if ( !( it->m_returncode.isSuccess() ) ) return it->m_returncode;
setExecuted( true );
if ( m_measureTime ) m_timerTool->stop( m_timer );
return StatusCode::SUCCESS;
}
// ============================================================================
// Finalize
// ============================================================================
StatusCode GaudiParallelizer::finalize()
{
if ( msgLevel( MSG::DEBUG ) ) debug() << "==> Finalize" << endmsg;
return GaudiAlgorithm::finalize(); // must be called after all other actions
}
// ============================================================================
StatusCode GaudiParallelizer::decodeNames()
{
StatusCode final = StatusCode::SUCCESS;
m_entries.clear();
//== Get the "Context" option if in the file...
auto jos = service<IJobOptionsSvc>( "JobOptionsSvc" );
bool addedContext = false; //= Have we added the context ?
bool addedRootInTES = false; //= Have we added the rootInTES ?
//= Get the Application manager, to see if algorithm exist
auto appMgr = service<IAlgManager>( "ApplicationMgr" );
const std::vector<std::string>& nameVector = m_names.value();
std::vector<std::string>::const_iterator it;
for ( it = nameVector.begin(); nameVector.end() != it; it++ ) {
const Gaudi::Utils::TypeNameString typeName( *it );
const std::string& theName = typeName.name();
const std::string& theType = typeName.type();
//== Check whether the specified algorithm already exists. If not, create it
StatusCode result = StatusCode::SUCCESS;
SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( typeName, false ); // do not create it now
if ( !myIAlg.isValid() ) {
//== Set the Context if not in the jobOptions list
if ( !context().empty() || !rootInTES().empty() ) {
bool foundContext = false;
bool foundRootInTES = false;
const auto properties = jos->getProperties( theName );
if ( properties ) {
// Iterate over the list to set the options
for ( const auto& p : *properties ) {
if ( "Context" == p->name() ) {
foundContext = true;
}
if ( "RootInTES" == p->name() ) {
foundRootInTES = true;
}
}
}
if ( !foundContext && !context().empty() ) {
Gaudi::Property<std::string> contextProperty( "Context", context() );
jos->addPropertyToCatalogue( theName, contextProperty ).ignore();
addedContext = true;
}
if ( !foundRootInTES && !rootInTES().empty() ) {
Gaudi::Property<std::string> rootInTESProperty( "RootInTES", rootInTES() );
jos->addPropertyToCatalogue( theName, rootInTESProperty ).ignore();
addedRootInTES = true;
}
}
Algorithm* myAlg = nullptr;
result = createSubAlgorithm( theType, theName, myAlg );
// (MCl) this should prevent bug #35199... even if I didn't manage to
// reproduce it with a simple test.
if ( result.isSuccess() ) myIAlg = myAlg;
} else {
Algorithm* myAlg = dynamic_cast<Algorithm*>( myIAlg.get() );
if ( myAlg ) {
subAlgorithms()->push_back( myAlg );
// when the algorithm is not created, the ref count is short by one, so we have to fix it.
myAlg->addRef();
}
}
//== Remove the property, in case this is not a GaudiAlgorithm...
if ( addedContext ) {
jos->removePropertyFromCatalogue( theName, "Context" ).ignore();
addedContext = false;
}
if ( addedRootInTES ) {
jos->removePropertyFromCatalogue( theName, "RootInTES" ).ignore();
addedRootInTES = false;
}
// propagate the sub-algorithm into own state.
if ( result.isSuccess() && Gaudi::StateMachine::INITIALIZED <= FSMState() && myIAlg.isValid() &&
Gaudi::StateMachine::INITIALIZED > myIAlg->FSMState() ) {
StatusCode sc = myIAlg->sysInitialize();
if ( sc.isFailure() ) {
result = sc;
}
}
// propagate the sub-algorithm into own state.
if ( result.isSuccess() && Gaudi::StateMachine::RUNNING <= FSMState() && myIAlg.isValid() &&
Gaudi::StateMachine::RUNNING > myIAlg->FSMState() ) {
StatusCode sc = myIAlg->sysStart();
if ( sc.isFailure() ) {
result = sc;
}
}
//== Is it an Algorithm ? Strange test...
if ( result.isSuccess() ) {
// TODO: (MCl) it is possible to avoid the dynamic_cast in most of the
// cases by keeping the result of createSubAlgorithm.
Algorithm* myAlg = dynamic_cast<Algorithm*>( myIAlg.get() );
if ( myAlg != 0 ) {
// Note: The reference counting is kept by the system of sub-algorithms
m_entries.push_back( AlgorithmEntry( myAlg ) );
if ( msgLevel( MSG::DEBUG ) ) debug() << "Added algorithm " << theName << endmsg;
} else {
warning() << theName << " is not an Algorithm - failed dynamic_cast" << endmsg;
final = StatusCode::FAILURE;
}
} else {
warning() << "Unable to find or create " << theName << endmsg;
final = result;
}
}
//== Print the list of algorithms
MsgStream& msg = info();
if ( m_modeOR ) msg << "OR ";
msg << "Member list: ";
using GaudiUtils::details::ostream_joiner;
ostream_joiner( msg, m_entries, ", ", []( MsgStream& msg, const AlgorithmEntry& entry ) -> MsgStream& {
Algorithm* myAlg = entry.algorithm();
auto myAlgType = System::typeinfoName( typeid( *myAlg ) );
if ( myAlg->name() != myAlgType ) {
msg << myAlgType << "/";
}
return msg << myAlg->name();
} );
if ( !context().empty() ) msg << ", with context '" << context() << "'";
if ( !rootInTES().empty() ) msg << ", with rootInTES '" << rootInTES() << "'";
msg << endmsg;
return final;
}
//=========================================================================
// Interface for the Property manager
//=========================================================================
void GaudiParallelizer::membershipHandler( Gaudi::Details::PropertyBase& /* p */ )
{
// no action for not-yet initialized sequencer
if ( Gaudi::StateMachine::INITIALIZED > FSMState() ) {
return;
} // RETURN
decodeNames().ignore();
if ( !m_measureTime ) {
return;
} // RETURN
// add the entries into timer table:
if ( 0 == m_timerTool ) {
m_timerTool = tool<ISequencerTimerTool>( "SequencerTimerTool" );
}
if ( m_timerTool->globalTiming() ) m_measureTime = true;
m_timer = m_timerTool->addTimer( name() );
m_timerTool->increaseIndent();
for ( std::vector<AlgorithmEntry>::iterator itE = m_entries.begin(); m_entries.end() != itE; ++itE ) {
itE->setTimer( m_timerTool->addTimer( itE->algorithm()->name() ) );
}
m_timerTool->decreaseIndent();
}
//=============================================================================
#ifndef LIB_GAUDIPARALLELIZER_H
#define LIB_GAUDIPARALLELIZER_H 1
// Include files
// from Gaudi
#include "GaudiAlg/GaudiAlgorithm.h"
#include "GaudiKernel/ThreadLocalContext.h"
#include <boost/bind.hpp>
#include <tbb/task_group.h>
#include <tbb/task_scheduler_init.h>
// Forward declarations
class ISequencerTimerTool;
/** @class GaudiParallelizer GaudiParallelizer.h
*
*
* @author Illya Shapoval
* @date 09/12/2011
*/
class GaudiParallelizer : public GaudiAlgorithm
{
public:
/// Standard constructor
GaudiParallelizer( const std::string& name, ISvcLocator* pSvcLocator );
/// Destructor. An explicit noexcept(true) is necessary for Gaudi to build (see GAUDI-1187)
~GaudiParallelizer() noexcept( true ) override {}
StatusCode initialize() override; ///< Algorithm initialization
StatusCode execute() override; ///< Algorithm execution
StatusCode finalize() override; ///< Algorithm finalization
/** for asynchronous changes in the list of algorithms */
void membershipHandler( Gaudi::Details::PropertyBase& theProp );
protected:
class AlgorithmEntry
{
public:
/// Standard constructor
AlgorithmEntry( Algorithm* alg )
{
m_algorithm = alg;
m_reverse = false;
m_timer = 0;
}
virtual ~AlgorithmEntry(){}; ///< Destructor
void setReverse( bool flag ) { m_reverse = flag; }
Algorithm* algorithm() const { return m_algorithm; }
bool reverse() const { return m_reverse; }
void setTimer( int nb ) { m_timer = nb; }
int timer() const { return m_timer; }
/// Thread task executor method to wrap an algorithm execution in
void run( GaudiParallelizer& prlzr )
{
if ( prlzr.m_measureTime ) prlzr.m_timerTool->start( timer() );
m_returncode = m_algorithm->sysExecute( Gaudi::Hive::currentContext() );
if ( prlzr.m_measureTime ) prlzr.m_timerTool->stop( timer() );
}
StatusCode m_returncode; ///< StatusCode of an algorithm execution received from a thread
private:
Algorithm* m_algorithm; ///< Algorithm pointer
bool m_reverse; ///< Indicates that the flag has to be inverted
int m_timer; ///< Timer number for this algorithm
};
/** Decode a vector of string. */
StatusCode decodeNames();
private:
Gaudi::Property<std::vector<std::string>> m_names{this, "Members", {}, "list of algorithms"};
Gaudi::Property<bool> m_modeOR{this, "ModeOR", false, "use OR loginc instead of AND"};
Gaudi::Property<bool> m_measureTime{this, "MeasureTime", false, "measure time"};
Gaudi::Property<bool> m_returnOK{this, "ReturnOK", false, "forces the sequencer to return a good status"};
Gaudi::Property<unsigned short> m_nthreads{this, "NumberOfThreads", 0, "number of threads in the thread pool"};
std::vector<AlgorithmEntry> m_entries; ///< List of algorithms to process.
ISequencerTimerTool* m_timerTool = nullptr; ///< Pointer to the timer tool
int m_timer; ///< Timer number for the sequencer
tbb::task_group m_task_group; ///< TBB task group
};
#endif // LIB_GAUDIPARALLELIZER_H
Change Log
0.2.5
- Made threadpool compatible to boost::thread 1.37
- Fixed hang problem in shutdown method (Thanks to Sohail Somani)
- Adapted repository layout to boost (Thanks to Alex Ott)
0.2.4 (Stable)
- Made threadpool compatible to boost::thread 1.35.x code base
- Fixed compiler warning in scope_guard.hpp
0.2.3 (Development)
- Implemented workaround for Sun C++ Pro compiler bug in pool_core
- Removed subtask implementation (there was no demand for this feature)
- Improved shutdown policies
0.2.2 (Development)
- Refactored SizePolicy and added SizePolicyController
- Moved policies into separate files
- Fixed some compiler problems (GCC)
- Implemented size_controller handling
- Implemented two size policies: static_size and fixed_size
- Refactored worker_thread handling, moved policies from pool_core to pool
- Specialized schedule function for usage with futures
- Added compile test project
- Improved constness in pool core class
- Fixed timed wait
- Implemented futures (in progress)
- Added result_type to all functors
0.2.1 (Development)
- Pool base class (thread_pool) has now reference semantics
- Large refactorings: Removed scoped_pool, reimplemented worker (now worker_thread)
- Fixed odd resize behaviour. Now resize changes the number of threads immediately
- Apply pimpl idiom on pool core class (to make the ugly scoped_pool class needless)
- Introduced scheduling policies
- Switched to policy-based design (PBD), inspired by Scott Meyers C++ workshop
- Cosmetic code change: Replaced keyword 'class' with 'typename' in template definitions
- Revised tutorials
- New requirements: tasks functions should not and schedulers shall not throw exceptions
0.2.0 (Development)
- Moved threadpool into the boost namespace: boost::threadpool
- Used keyword volatile to indicate thread-safe member functions
- Used volatile on primitve types were appropriate
- Moved worker to detail directory
- Fixed thread deletion
- Extended wait functionality (waiting for idle threads was implemented)
- Renamed 'join()' to 'wait()' as 'join' indicates the termination of thread. That was not the case in pool::join.
- Changed internal container of lifo and fifo schedulers to improve efficiency.
- Improved code reference documentation (source code browser)
- Renamed thread_func into task_func
- Added += operator to scoped_pool to ease scheduling of tasks
- Refactored file structures and class names
- Added a new task concept (named subtask) which allows the combination of sequential and parallel execution
- Added new task adaptor for looped or timed tasks: looped_task_func
- Introduced function clear() which can be used to remove all tasks from the pool and schedulers
- New function pool::active() which returns the number of active tasks
0.1.8 (Development)
- Fixed some compile errors which were reported by gcc
- Wrote tutorial "Prioritized Tasks"
0.1.7 (Development)
- Added Visual Studio 2005 project files for tutorial and examples
0.1.6 (Development)
- Replaced task adaptor 'task' with boost::bind
- Workers are unregistered from thread_group when they are terminated
- Working on example boost::iostreams packet_filter
0.1.5 (Development)
- Finished threadpool reference
- Class pool_adaptor was replaced by smart_pool
- Minor pool improvements
- First tutorial page online
0.1.4 (Development)
- Created the pool's homepage: http://threadpool.sourceforge.net
- Improved documentation
- Added build scripts
0.1.3 (Development)
- First public release
- Added mergesort example
- Added tutorial
- Implementation of threadpool core and related classes
\ No newline at end of file
Copyright (c) 2005-2007 Philipp Henkel
Use, modification, and distribution are subject to the
Boost Software License, Version 1.0. (See accompanying file
LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
\ No newline at end of file
use-project /boost
: $(BOOST_ROOT)
;
project threadpool
: requirements <include>.&&$(BOOST_ROOT)
# disable auto-linking for all targets here,
# primarily because it caused troubles with V2
<define>BOOST_ALL_NO_LIB=1
: usage-requirements <include>.&&$(BOOST_ROOT)
: build-dir bin.v2
;
project boost : $(BOOST_ROOT) ;
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
threadpool
Copyright (c) 2005-2007 Philipp Henkel
threadpool is a cross-platform C++ thread pool library and released under the Boost Software License.
See doc/index.html for information on:
- API documentation and a tutorial
- External dependencies
- Using threadpool
threadpool to-do items
======================
Documentation
--------------------------------------------
- Source code documentation
- Design rationale
- Tutorial
- Finish Quickstart Tutorial/Example
Functionality
--------------------------------------------
- Implement a size policy which dynamically
increase/decrease the pool's size:
- init with min/max threads
- auto increase
- auto decrease (using timed cleanup tasks)
- Add some kind of deadline scheduler
- Add futures to pool
Examples
--------------------------------------------
- Buffering Client Requests: Handle 'bursty' client traffic
Some applications need more buffering than is provided by OS I/O subsystem
Working on 'active' buffer for boost::iostreams
buffer_filter which provides a dynamic amount of buffer objects.
buffer_filter uses a threadpool with one thread which provides the buffers to
consumer. (TODO Philipp)
2nd implemention step:
Flexbile configuration: Buffer capacities can be configured according to
- maximum number of requests
- maximum number of bytes
/*! \file
* \brief Main include.
*
* This is the only file you have to include in order to use the
* complete threadpool library.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_HPP_INCLUDED
#define THREADPOOL_HPP_INCLUDED
#include "./threadpool/future.hpp"
#include "./threadpool/pool.hpp"
#include "./threadpool/pool_adaptors.hpp"
#include "./threadpool/task_adaptors.hpp"
#endif // THREADPOOL_HPP_INCLUDED
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED
#define THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED
#include "locking_ptr.hpp"
#include <boost/smart_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/utility/result_of.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
namespace boost { namespace threadpool { namespace detail
{
template<class Result>
class future_impl
{
public:
typedef Result const & result_type; //!< Indicates the functor's result type.
typedef Result future_result_type; //!< Indicates the future's result type.
typedef future_impl<future_result_type> future_type;
private:
volatile bool m_ready;
volatile future_result_type m_result;
mutable mutex m_monitor;
mutable condition m_condition_ready;
volatile bool m_is_cancelled;
volatile bool m_executing;
public:
public:
future_impl()
: m_ready(false)
, m_is_cancelled(false)
{
}
bool ready() const volatile
{
return m_ready;
}
void wait() const volatile
{
const future_type* self = const_cast<const future_type*>(this);
mutex::scoped_lock lock(self->m_monitor);
while(!m_ready)
{
self->m_condition_ready.wait(lock);
}
}
bool timed_wait(boost::xtime const & timestamp) const
{
const future_type* self = const_cast<const future_type*>(this);
mutex::scoped_lock lock(self->m_monitor);
while(!m_ready)
{
if(!self->m_condition_ready.timed_wait(lock, timestamp)) return false;
}
return true;
}
result_type operator()() const volatile
{
wait();
/*
if( throw_exception_ != 0 )
{
throw_exception_( this );
}
*/
return *(const_cast<const future_result_type*>(&m_result));
}
void set_value(future_result_type const & r) volatile
{
locking_ptr<future_type, mutex> lockedThis(*this, m_monitor);
if(!m_ready && !m_is_cancelled)
{
lockedThis->m_result = r;
lockedThis->m_ready = true;
lockedThis->m_condition_ready.notify_all();
}
}
/*
template<class E> void set_exception() // throw()
{
m_impl->template set_exception<E>();
}
template<class E> void set_exception( char const * what ) // throw()
{
m_impl->template set_exception<E>( what );
}
*/
bool cancel() volatile
{
if(!m_ready || m_executing)
{
m_is_cancelled = true;
return true;
}
else
{
return false;
}
}
bool is_cancelled() const volatile
{
return m_is_cancelled;
}
void set_execution_status(bool executing) volatile
{
m_executing = executing;
}
};
template<
template <typename> class Future,
typename Function
>
class future_impl_task_func
{
public:
typedef void result_type; //!< Indicates the functor's result type.
typedef Function function_type; //!< Indicates the function's type.
typedef typename result_of<function_type()>::type future_result_type; //!< Indicates the future's result type.
typedef Future<future_result_type> future_type; //!< Indicates the future's type.
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<function_type()>::arity == 0);
// The task function's result type is required not to be void.
BOOST_STATIC_ASSERT(!is_void<future_result_type>::value);
private:
function_type m_function;
shared_ptr<future_type> m_future;
public:
future_impl_task_func(function_type const & function, shared_ptr<future_type> const & future)
: m_function(function)
, m_future(future)
{
}
void operator()()
{
if(m_function)
{
m_future->set_execution_status(true);
if(!m_future->is_cancelled())
{
// TODO future exeception handling
m_future->set_value(m_function());
}
m_future->set_execution_status(false); // TODO consider exceptions
}
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED
/*! \file
* \brief The locking_ptr is smart pointer with a scoped locking mechanism.
*
* The class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
* locking_ptr is based on Andrei Alexandrescu's LockingPtr. For more information
* see article "volatile - Multithreaded Programmer's Best Friend" by A. Alexandrescu.
*
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#define THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#include <boost/utility.hpp>
#include <boost/thread/mutex.hpp>
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Smart pointer with a scoped locking mechanism.
*
* This class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
*/
template <typename T, typename Mutex>
class locking_ptr
: private noncopyable
{
T* m_obj; //!< The instance pointer.
Mutex & m_mutex; //!< Mutex is used for scoped locking.
public:
/// Constructor.
locking_ptr(volatile T& obj, const volatile Mutex& mtx)
: m_obj(const_cast<T*>(&obj))
, m_mutex(*const_cast<Mutex*>(&mtx))
{
// Lock mutex
m_mutex.lock();
}
/// Destructor.
~locking_ptr()
{
// Unlock mutex
m_mutex.unlock();
}
/*! Returns a reference to the stored instance.
* \return The instance's reference.
*/
T& operator*() const
{
return *m_obj;
}
/*! Returns a pointer to the stored instance.
* \return The instance's pointer.
*/
T* operator->() const
{
return m_obj;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED
#include "locking_ptr.hpp"
#include "worker_thread.hpp"
#include "../task_adaptors.hpp"
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
#include <vector>
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool_impl is DefaultConstructible and NonCopyable.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task,
template <typename> class SchedulingPolicy,
template <typename> class SizePolicy,
template <typename> class SizePolicyController,
template <typename> class ShutdownPolicy
>
class pool_core
: public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
, private noncopyable
{
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
typedef pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
//typedef typename size_policy_type::size_controller size_controller_type;
typedef SizePolicyController<pool_type> size_controller_type;
// typedef SizePolicy<pool_type>::size_controller size_controller_type;
typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
typedef worker_thread<pool_type> worker_type;
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
// The task function's result type is required to be void.
BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
private: // Friends
friend class worker_thread<pool_type>;
#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
friend class SizePolicy;
friend class ShutdownPolicy;
#else
friend class SizePolicy<pool_type>;
friend class ShutdownPolicy<pool_type>;
#endif
private: // The following members may be accessed by _multiple_ threads at the same time:
volatile size_t m_worker_count;
volatile size_t m_target_worker_count;
volatile size_t m_active_worker_count;
private: // The following members are accessed only by _one_ thread at the same time:
scheduler_type m_scheduler;
scoped_ptr<size_policy_type> m_size_policy; // is never null
bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
private: // The following members are implemented thread-safe:
mutable recursive_mutex m_monitor;
mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
public:
/// Constructor.
pool_core()
: m_worker_count(0)
, m_target_worker_count(0)
, m_active_worker_count(0)
, m_terminate_all_workers(false)
{
pool_type volatile & self_ref = *this;
m_size_policy.reset(new size_policy_type(self_ref));
m_scheduler.clear();
}
/// Destructor.
~pool_core()
{
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return size_controller_type(*m_size_policy, this->shared_from_this());
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const volatile
{
return m_worker_count;
}
// TODO is only called once
void shutdown()
{
ShutdownPolicy<pool_type>::shutdown(*this);
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const volatile
{
return m_active_worker_count;
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.size();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear() volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
lockedThis->m_scheduler.clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
return true;
}
private:
void terminate_all_workers(bool const wait) volatile
{
pool_type* self = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
self->m_terminate_all_workers = true;
m_target_worker_count = 0;
self->m_task_or_terminate_workers_event.notify_all();
if(wait)
{
while(m_active_worker_count > 0)
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
it != self->m_terminated_workers.end();
++it)
{
(*it)->join();
}
self->m_terminated_workers.clear();
}
}
/*! Changes the number of worker threads in the pool. The resizing
* is handled by the SizePolicy.
* \param threads The new number of worker threads.
* \return true, if pool will be resized and false if not.
*/
bool resize(size_t const worker_count) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(!m_terminate_all_workers)
{
m_target_worker_count = worker_count;
}
else
{
return false;
}
if(m_worker_count <= m_target_worker_count)
{ // increase worker count
while(m_worker_count < m_target_worker_count)
{
try
{
worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
m_worker_count++;
m_active_worker_count++;
}
catch(thread_resource_error)
{
return false;
}
}
}
else
{ // decrease worker count
lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
}
return true;
}
// worker died with unhandled exception
void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
else
{
lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
}
}
void worker_destructed(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
}
bool execute_task() volatile
{
function0<void> task;
{ // fetch task
pool_type* lockedThis = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// wait for tasks
while(lockedThis->m_scheduler.empty())
{
// decrease number of workers if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
else
{
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
lockedThis->m_task_or_terminate_workers_event.wait(lock);
m_active_worker_count++;
}
}
task = lockedThis->m_scheduler.top();
lockedThis->m_scheduler.pop();
}
// call task function
if(task)
{
task();
}
//guard->disable();
return true;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#define THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#include <boost/function.hpp>
namespace boost { namespace threadpool { namespace detail
{
// TODO documentation
class scope_guard
: private boost::noncopyable
{
function0<void> const m_function;
bool m_is_active;
public:
scope_guard(function0<void> const & call_on_exit)
: m_function(call_on_exit)
, m_is_active(true)
{
}
~scope_guard()
{
if(m_is_active && m_function)
{
m_function();
}
}
void disable()
{
m_is_active = false;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
/*! \file
* \brief Thread pool worker.
*
* The worker thread instance is attached to a pool
* and executes tasks of this pool.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#define THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#include "scope_guard.hpp"
#include <boost/smart_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool worker.
*
* A worker_thread represents a thread of execution. The worker is attached to a
* thread pool and processes tasks of that pool. The lifetime of the worker and its
* internal boost::thread is managed automatically.
*
* This class is a helper class and cannot be constructed or accessed directly.
*
* \see pool_core
*/
template <typename Pool>
class worker_thread
: public enable_shared_from_this< worker_thread<Pool> >
, private noncopyable
{
public:
typedef Pool pool_type; //!< Indicates the pool's type.
private:
shared_ptr<pool_type> m_pool; //!< Pointer to the pool which created the worker.
shared_ptr<boost::thread> m_thread; //!< Pointer to the thread which executes the run loop.
/*! Constructs a new worker.
* \param pool Pointer to it's parent pool.
* \see function create_and_attach
*/
worker_thread(shared_ptr<pool_type> const & pool)
: m_pool(pool)
{
assert(pool);
}
/*! Notifies that an exception occurred in the run loop.
*/
void died_unexpectedly()
{
m_pool->worker_died_unexpectedly(this->shared_from_this());
}
public:
/*! Executes pool's tasks sequentially.
*/
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {}
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
/*! Joins the worker's thread.
*/
void join()
{
m_thread->join();
}
/*! Constructs a new worker thread and attaches it to the pool.
* \param pool Pointer to the pool.
*/
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_FUTURE_HPP_INCLUDED
#define THREADPOOL_FUTURE_HPP_INCLUDED
#include "./detail/future.hpp"
#include <boost/utility/enable_if.hpp>
//#include "pool.hpp"
//#include <boost/utility.hpp>
//#include <boost/thread/mutex.hpp>
namespace boost { namespace threadpool
{
/*! \brief Experimental. Do not use in production code. TODO.
*
* TODO Future
*
* \see TODO
*
*/
template<class Result>
class future
{
private:
shared_ptr<detail::future_impl<Result> > m_impl;
public:
typedef Result const & result_type; //!< Indicates the functor's result type.
typedef Result future_result_type; //!< Indicates the future's result type.
public:
future()
: m_impl(new detail::future_impl<future_result_type>()) // TODO remove this
{
}
// only for internal usage
future(shared_ptr<detail::future_impl<Result> > const & impl)
: m_impl(impl)
{
}
bool ready() const
{
return m_impl->ready();
}
void wait() const
{
m_impl->wait();
}
bool timed_wait(boost::xtime const & timestamp) const
{
return m_impl->timed_wait(timestamp);
}
result_type operator()() // throw( thread::cancelation_exception, ... )
{
return (*m_impl)();
}
result_type get() // throw( thread::cancelation_exception, ... )
{
return (*m_impl)();
}
bool cancel()
{
return m_impl->cancel();
}
bool is_cancelled() const
{
return m_impl->is_cancelled();
}
};
template<class Pool, class Function>
typename disable_if <
is_void< typename result_of< Function() >::type >,
future< typename result_of< Function() >::type >
>::type
schedule(Pool& pool, const Function& task)
{
typedef typename result_of< Function() >::type future_result_type;
// create future impl and future
shared_ptr<detail::future_impl<future_result_type> > impl(new detail::future_impl<future_result_type>);
future <future_result_type> res(impl);
// schedule future impl
pool.schedule(detail::future_impl_task_func<detail::future_impl, Function>(task, impl));
// return future
return res;
/*
TODO
if(pool->schedule(bind(&Future::run, future)))
{
return future;
}
else
{
// construct empty future
return error_future;
}
*/
}
} } // namespace boost::threadpool
#endif // THREADPOOL_FUTURE_HPP_INCLUDED
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_HPP_INCLUDED
#define THREADPOOL_POOL_HPP_INCLUDED
#include <boost/ref.hpp>
#include "./detail/pool_core.hpp"
#include "task_adaptors.hpp"
#include "./detail/locking_ptr.hpp"
#include "scheduling_policies.hpp"
#include "size_policies.hpp"
#include "shutdown_policies.hpp"
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool is DefaultConstructible, CopyConstructible and Assignable.
* It has reference semantics; all copies of the same pool are equivalent and interchangeable.
* All operations on a pool except assignment are strongly thread safe or sequentially consistent;
* that is, the behavior of concurrent calls is as if the calls have been issued sequentially in an unspecified order.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param SchedulingPolicy A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task = task_func,
template <typename> class SchedulingPolicy = fifo_scheduler,
template <typename> class SizePolicy = static_size,
template <typename> class SizePolicyController = resize_controller,
template <typename> class ShutdownPolicy = wait_for_all_tasks
>
class thread_pool
{
typedef detail::pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy> pool_core_type;
shared_ptr<pool_core_type> m_core; // pimpl idiom
shared_ptr<void> m_shutdown_controller; // If the last pool holding a pointer to the core is deleted the controller shuts the pool down.
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
/* typedef thread_pool<Task,
SchedulingPolicy,
SizePolicy,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
*/
typedef SizePolicy<pool_core_type> size_policy_type;
typedef SizePolicyController<pool_core_type> size_controller_type;
public:
/*! Constructor.
* \param initial_threads The pool is immediately resized to set the specified number of threads. The pool's actual number threads depends on the SizePolicy.
*/
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return m_core->size_controller();
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const
{
return m_core->size();
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const
{
return m_core->active();
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const
{
return m_core->pending();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear()
{
m_core->clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const
{
return m_core->empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t task_threshold = 0) const
{
m_core->wait(task_threshold);
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t task_threshold = 0) const
{
return m_core->wait(timestamp, task_threshold);
}
};
/*! \brief Fifo pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;
/*! \brief Lifo pool.
*
* The pool's tasks are lifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, lifo_scheduler, static_size, resize_controller, wait_for_all_tasks> lifo_pool;
/*! \brief Pool for prioritized task.
*
* The pool's tasks are prioritized prio_task_func functors.
*
*/
typedef thread_pool<prio_task_func, prio_scheduler, static_size, resize_controller, wait_for_all_tasks> prio_pool;
/*! \brief A standard pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef fifo_pool pool;
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_HPP_INCLUDED
/*! \file
* \brief Pool adaptors.
*
* This file contains an easy-to-use adaptor similar to a smart
* pointer for the pool class.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#define THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#include <boost/smart_ptr.hpp>
namespace boost { namespace threadpool
{
// TODO convenience scheduling function
/*! Schedules a Runnable for asynchronous execution. A Runnable is an arbitrary class with a run()
* member function. This a convenience shorthand for pool->schedule(bind(&Runnable::run, task_object)).
* \param
* \param obj The Runnable object. The member function run() will be exectued and should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
template<typename Pool, typename Runnable>
bool schedule(Pool& pool, shared_ptr<Runnable> const & obj)
{
return pool->schedule(bind(&Runnable::run, obj));
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object.
*/
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(Pool& pool, typename Pool::task_type const & task)
{
return pool.schedule(task);
}
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(shared_ptr<Pool> const pool, typename Pool::task_type const & task)
{
return pool->schedule(task);
}
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment