Skip to content
Snippets Groups Projects
Commit bc3db2a1 authored by Illya Shapoval's avatar Illya Shapoval
Browse files

CPUCruncher extended to replace part of crunching time with plain sleeping...

CPUCruncher extended to replace part of crunching time with plain sleeping (needed to simulate I/O-bound behaviourof an algorithm)
parent e57cdf3f
No related branches found
No related tags found
1 merge request!178A prototype of a scheduler for efficient handling of heterogeneous tasks
......@@ -15,63 +15,59 @@ DECLARE_ALGORITHM_FACTORY(CPUCruncher)
//------------------------------------------------------------------------------
CPUCruncher::CPUCruncher(const std::string& name, // the algorithm instance name
ISvcLocator*pSvc) :
GaudiAlgorithm(name, pSvc), m_avg_runtime(1.), m_var_runtime(.01), m_shortCalib(
false) {
// For Concurrent run
m_inputHandles.resize(MAX_INPUTS);
for (uint i = 0; i < MAX_INPUTS; ++i){
m_inputHandles[i] = new DataObjectHandle<DataObject>();
declareInput("input_" + std::to_string(i), *m_inputHandles[i]);
}
CPUCruncher::CPUCruncher(const std::string& name, ISvcLocator*pSvc)
: GaudiAlgorithm(name, pSvc),
m_avg_runtime(1.),
m_var_runtime(.01),
m_shortCalib(false),
m_sleepFraction(0.0f)
{
m_outputHandles.resize(MAX_OUTPUTS);
for (uint i = 0; i < MAX_OUTPUTS; ++i){
m_outputHandles[i] = new DataObjectHandle<DataObject>();
declareOutput("output_" + std::to_string(i), *m_outputHandles[i]);
}
// For Concurrent run
m_inputHandles.resize(MAX_INPUTS);
for (uint i = 0; i < MAX_INPUTS; ++i){
m_inputHandles[i] = new DataObjectHandle<DataObject>();
declareInput("input_" + std::to_string(i), *m_inputHandles[i]);
}
m_outputHandles.resize(MAX_OUTPUTS);
for (uint i = 0; i < MAX_OUTPUTS; ++i){
m_outputHandles[i] = new DataObjectHandle<DataObject>();
declareOutput("output_" + std::to_string(i), *m_outputHandles[i]);
}
declareProperty("avgRuntime", m_avg_runtime,
"Average runtime of the module.");
declareProperty("varRuntime", m_var_runtime,
"Variance of the runtime of the module.");
declareProperty("localRndm", m_local_rndm_gen = true,
"Decide if the local random generator is to be used");
declareProperty("NIterationsVect", m_niters_vect,
"Number of iterations for the calibration.");
declareProperty("NTimesVect", m_times_vect,
"Number of seconds for the calibration.");
declareProperty("shortCalib", m_shortCalib = false,
"Enable coarse grained calibration");
declareProperty("RwRepetitions", m_rwRepetitions = 1,
"Increase access to the WB");
declareProperty("SleepyExecution", m_sleepyExecution = false,
"Sleep during execution instead of crunching");
// Register the algo in the static concurrent hash map in order to
// monitor the # of copies
CHM::accessor name_ninstances;
m_name_ncopies_map.insert(name_ninstances, name);
name_ninstances->second += 1;
declareProperty("avgRuntime" , m_avg_runtime, "Average runtime of the module.");
declareProperty("varRuntime" , m_var_runtime, "Variance of the runtime of the module.");
declareProperty("localRndm" , m_local_rndm_gen = true, "Decide if the local random generator is to be used");
declareProperty("NIterationsVect", m_niters_vect, "Number of iterations for the calibration.");
declareProperty("NTimesVect" , m_times_vect, "Number of seconds for the calibration.");
declareProperty("shortCalib" , m_shortCalib = false, "Enable coarse grained calibration");
declareProperty("RwRepetitions" , m_rwRepetitions = 1, "Increase access to the WB");
declareProperty("SleepFraction" , m_sleepFraction = 0.0f, "Fraction of time, between 0 and 1, when "
"an algorithm is actually sleeping instead of crunching");
// Register the algo in the static concurrent hash map in order to
// monitor the # of copies
CHM::accessor name_ninstances;
m_name_ncopies_map.insert(name_ninstances, name);
name_ninstances->second += 1;
}
CPUCruncher::~CPUCruncher() {
for (uint i = 0; i < MAX_INPUTS; ++i) {
delete m_inputHandles[i];
}
for (uint i = 0; i < MAX_INPUTS; ++i)
delete m_inputHandles[i];
m_outputHandles.resize(MAX_OUTPUTS);
for (uint i = 0; i < MAX_OUTPUTS; ++i) {
delete m_outputHandles[i];
}
for (uint i = 0; i < MAX_OUTPUTS; ++i)
delete m_outputHandles[i];
}
StatusCode CPUCruncher::initialize(){
if (m_times_vect.size()==0){
if (m_times_vect.size() == 0)
calibrate();
}
// if an algorithm was setup to sleep, for whatever period, it effectively becomes I/O-bound
if (m_sleepFraction != 0.0f)
setIOBound(true);
return StatusCode::SUCCESS ;
}
......@@ -115,7 +111,7 @@ void CPUCruncher::calibrate(){
if (!m_shortCalib){
m_niters_vect.push_back(100000);
m_niters_vect.push_back(200000);
}
}
m_times_vect.resize(m_niters_vect.size());
......@@ -196,12 +192,12 @@ void CPUCruncher::findPrimes (const unsigned long int n_iterations) {
// Check if it can be divided by the smaller ones
is_prime = true;
for (unsigned long j=2;j<i && is_prime;++j){
if (i%j == 0){
for (unsigned long j=2;j<i && is_prime;++j) {
if (i%j == 0)
is_prime = false;
}
}// end loop on numbers < than tested one
if (is_prime){
if (is_prime) {
// copy the array of primes (INEFFICIENT ON PURPOSE!)
unsigned int new_primes_size = 1 + primes_size;
unsigned long* new_primes = new unsigned long[new_primes_size];
......@@ -236,24 +232,7 @@ StatusCode CPUCruncher::execute () // the execution of the algorithm
MsgStream logstream(msgSvc(), name());
if (isIOBound()) {
//logstream << MSG::DEBUG << "Going to dream for: "<< m_avg_runtime << endmsg;
std::chrono::duration<double> dreamtime( m_avg_runtime );
//tbb::tick_count starttbb=tbb::tick_count::now();
std::this_thread::sleep_for(dreamtime);
//tbb::tick_count endtbb=tbb::tick_count::now();
// actual sleeping time can be longer due to scheduling or resource contention delays
//const double actualDreamTime=(endtbb-starttbb).seconds();
//logstream << MSG::DEBUG << "Actual dreaming time was: "<< actualDreamTime << "s" << endmsg;
//return StatusCode::SUCCESS;
}
float runtime;
//const std::chrono::duration<double> dreamtime( m_avg_runtime + m_avg_runtime/2.);
//std::this_thread::sleep_for(dreamtime);
float crunchtime;
if (m_local_rndm_gen){
/* This will disappear with a thread safe random number generator svc
......@@ -287,21 +266,50 @@ StatusCode CPUCruncher::execute () // the execution of the algorithm
const double normal = sqrt(-2.*log(unif1))*cos(2*M_PI*unif2);
return normal*sigma + mean;
};
runtime = fabs(getGausRandom( m_avg_runtime , m_var_runtime ));
crunchtime = fabs(getGausRandom( m_avg_runtime * (1. - m_sleepFraction), m_var_runtime ));
//End Of temp block
} else {
// Should be a member.
HiveRndm::HiveNumbers rndmgaus(randSvc(), Rndm::Gauss( m_avg_runtime , m_var_runtime ));
runtime = std::fabs(rndmgaus());
HiveRndm::HiveNumbers rndmgaus(randSvc(), Rndm::Gauss( m_avg_runtime * (1. - m_sleepFraction), m_var_runtime ));
crunchtime = std::fabs(rndmgaus());
}
// Prepare to sleep (even if we won't enter the following if clause for sleeping).
// This is needed to distribute evenly among all algorithms the overhead (around sleeping) which is harmful when
// trying to achieve uniform distribution of algorithm timings.
const double dreamtime = m_avg_runtime * m_sleepFraction;
const std::chrono::duration<double> dreamtime_duration( dreamtime );
tbb::tick_count startSleeptbb;
tbb::tick_count endSleeptbb;
// Start to measure the total time here, together with the dreaming process straight ahead
tbb::tick_count starttbb=tbb::tick_count::now();
logstream << MSG::DEBUG << "Runtime will be: "<< runtime << endmsg;
// If the algorithm was set as I/O-bound, we will replace requested part of crunching with plain sleeping
if (isIOBound()) {
// in this block (and not in other places around) msgLevel is checked for the same reason as above, when
// preparing to sleep several lines above: to reduce as much as possible the overhead around sleeping
if (msgLevel(MSG::DEBUG))
logstream << MSG::DEBUG << "Dreaming time will be: " << dreamtime << endmsg;
if (msgLevel(MSG::DEBUG))
startSleeptbb=tbb::tick_count::now();
std::this_thread::sleep_for(dreamtime_duration);
if (msgLevel(MSG::DEBUG))
endSleeptbb=tbb::tick_count::now();
// actual sleeping time can be longer due to scheduling or resource contention delays
if (msgLevel(MSG::DEBUG)) {
const double actualDreamTime=(endSleeptbb-startSleeptbb).seconds();
logstream << MSG::DEBUG << "Actual dreaming time was: " << actualDreamTime << "s" << endmsg;
}
} // end of "sleeping block"
logstream << MSG::DEBUG << "Crunching time will be: " << crunchtime << endmsg;
if (getContext())
logstream << MSG::DEBUG << "Start event " << getContext()->evt()
<< " in slot " << getContext()->slot()
<< " on pthreadID " << std::hex << pthread_self() << std::dec
<< endmsg;
<< " in slot " << getContext()->slot()
<< " on pthreadID " << std::hex << pthread_self() << std::dec << endmsg;
for (auto & inputHandle: m_inputHandles){
if(!inputHandle->isValid())
......@@ -315,24 +323,23 @@ StatusCode CPUCruncher::execute () // the execution of the algorithm
logstream << MSG::ERROR << "A read object was a null pointer." << endmsg;
}
const unsigned long n_iters= getNCaliIters(runtime);
const unsigned long n_iters= getNCaliIters(crunchtime);
findPrimes( n_iters );
for (auto & outputHandle: m_outputHandles){
for (auto & outputHandle: m_outputHandles) {
if(!outputHandle->isValid())
continue;
outputHandle->put(new DataObject());
}
for (auto & inputHandle: m_inputHandles){
if(!inputHandle->isValid())
continue;
for (auto & inputHandle: m_inputHandles) {
if(!inputHandle->isValid())
continue;
DataObject* obj = nullptr;
for (unsigned int i=1; i<m_rwRepetitions;++i){
for (unsigned int i=1; i<m_rwRepetitions;++i)
obj = inputHandle->get();
}
}
tbb::tick_count endtbb=tbb::tick_count::now();
......@@ -341,13 +348,14 @@ StatusCode CPUCruncher::execute () // the execution of the algorithm
if (getContext())
logstream << MSG::DEBUG << "Finish event " << getContext()->evt()
// << " on pthreadID " << getContext()->m_thread_id
<< " in " << actualRuntime << " seconds" << endmsg;
// << " on pthreadID " << getContext()->m_thread_id
<< " in " << actualRuntime << " seconds" << endmsg;
logstream << MSG::DEBUG << "Timing: ExpectedRuntime= " << runtime
<< " ActualRuntime= " << actualRuntime
<< " Ratio= " << runtime/actualRuntime
<< " Niters= " << n_iters << endmsg;
logstream << MSG::DEBUG << "Timing: ExpectedCrunchtime= " << crunchtime
<< " ExpectedDreamtime= " << dreamtime
<< " ActualTotalRuntime= " << actualRuntime
<< " Ratio= " << (crunchtime + dreamtime)/actualRuntime
<< " Niters= " << n_iters << endmsg;
return StatusCode::SUCCESS ;
......@@ -369,48 +377,41 @@ StatusCode CPUCruncher::finalize () // the finalization of the algorithm
constexpr double s2ms=1000.;
// do not show repetitions
if (ninstances!=0){
log << MSG::INFO << "Summary: name= "<< name()
<<"\t avg_runtime= " << m_avg_runtime*s2ms
if (ninstances!=0) {
log << MSG::INFO << "Summary: name= " << name()
<< "\t avg_runtime= " << m_avg_runtime*s2ms
<< "\t n_clones= " << ninstances << endmsg;
CHM::accessor name_ninstances;
m_name_ncopies_map.find(name_ninstances,name());
name_ninstances->second=0;
}
}
return GaudiAlgorithm::finalize () ;
}
//------------------------------------------------------------------------------
const std::vector<std::string>
CPUCruncher::get_inputs()
{
const std::vector<std::string> CPUCruncher::get_inputs() {
std::vector<std::string> di;
for (auto & h: m_inputHandles){
if(h->isValid())
di.push_back(h->dataProductName());
}
for (auto & h: m_inputHandles)
if(h->isValid())
di.push_back(h->dataProductName());
return di;
}
//------------------------------------------------------------------------------
const std::vector<std::string>
CPUCruncher::get_outputs()
{
std::vector<std::string> di;
for (auto & h: m_outputHandles){
if(h->isValid())
di.push_back(h->dataProductName());
}
const std::vector<std::string> CPUCruncher::get_outputs() {
return di;
std::vector<std::string> di;
for (auto & h: m_outputHandles)
if(h->isValid())
di.push_back(h->dataProductName());
return di;
}
//------------------------------------------------------------------------------
......@@ -38,8 +38,6 @@
double get_runtime() const { return m_avg_runtime; };
virtual bool isIOBound() const {return m_sleepyExecution;};
protected:
CPUCruncher
......@@ -85,7 +83,7 @@
static CHM m_name_ncopies_map;
// Sleep during execution instead of real CPU crunching
bool m_sleepyExecution;
// Fraction of total execution time, during which an algorithm is actually sleeping instead of crunching CPU
float m_sleepFraction;
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment