Commit cfbd2bca authored by Elvin Sindrilaru's avatar Elvin Sindrilaru

COMMON: Properly handle scale up/down of thread pool and catch exceptions from...

COMMON: Properly handle scale up/down of thread pool and catch exceptions from std::async. Fixes EOS-3240
parent 10827498
......@@ -90,6 +90,13 @@ EOSCOMMONNAMESPACE_BEGIN
(LOG_INFO), __VA_ARGS__); \
}
#define eos_err_lite(...) \
if ((LOG_MASK(LOG_ERR) & eos::common::Logging::GetInstance().GetLogMask()) != 0) { \
eos::common::Logging::GetInstance().log(__FUNCTION__,__FILE__, __LINE__, \
this->logId, vid, this->cident, \
(LOG_ERR), __VA_ARGS__); \
}
//------------------------------------------------------------------------------
//! Log Macros usable in objects inheriting from the logId Class
//------------------------------------------------------------------------------
......
......@@ -85,13 +85,18 @@ public:
};
for (auto i = 0u; i < std::max(mThreadsMin.load(), 1u); ++i) {
mThreadPool.emplace_back(
std::async(std::launch::async, threadPoolFunc)
);
try {
mThreadPool.emplace_back(std::async(std::launch::async, threadPoolFunc));
} catch (const std::exception& e) {
std::cerr << "error: std::async couldn't start a new thread "
<< "and threw an exception: " << e.what() << std::endl;
continue;
}
++mThreadCount;
}
mPoolSize = mThreadPool.size();
mThreadCount += mThreadsMin;
if (mThreadsMax > mThreadsMin) {
auto maintainerThreadFunc = [this, threadPoolFunc, samplingInterval,
......@@ -112,48 +117,52 @@ public:
// Check first if we have finished, removable threads/futures and remove them
mThreadPool.erase(
std::remove_if(
mThreadPool.begin(),
mThreadPool.end(),
std::remove_if(mThreadPool.begin(), mThreadPool.end(),
[](std::future<void>& future) {
return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
),
mThreadPool.end()
);
return (future.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready);
}),
mThreadPool.end());
sumQueueSize += mTasks.size();
if (++rounds == samplingNumber) {
auto averageQueueSize = (double) sumQueueSize / rounds;
if (averageQueueSize > mThreadCount) {
if ((averageQueueSize > mThreadCount) && (mThreadCount <= mThreadsMax)) {
auto threadsToAdd =
std::min((unsigned int) floor(averageQueueSize /
averageWaitingJobsPerNewThread),
mThreadsMax - mThreadCount);
try {
for (auto i = 0u; i < threadsToAdd; i++) {
while (threadsToAdd > 0) {
try {
mThreadPool.emplace_back(std::async(std::launch::async,
threadPoolFunc));
} catch (const std::exception& e) {
std::cerr << "error: std::async couldn't start a new thread "
<< "and threw an exception: " << e.what() << std::endl;
continue;
}
mThreadCount += threadsToAdd;
} catch (const std::system_error& e) {
std::cerr << "error: std::async couldn't start a new thread "
<< "and threw an exception: " << e.what() << std::endl;
++mThreadCount;
--threadsToAdd;
}
} else {
unsigned int threadsToRemove =
mThreadCount - std::max((unsigned int) floor(averageQueueSize),
mThreadsMin.load());
unsigned int threadsToRemove = 0ull;
// Push in fake tasks for each threads to be stopped so threads can wake up and
if (mThreadCount > mThreadsMax) {
threadsToRemove = mThreadCount - mThreadsMax;
} else {
threadsToRemove = mThreadCount -
std::max((unsigned int) floor(averageQueueSize), mThreadsMin.load());
}
// Push in fake tasks for each thread to be stopped so threads can wake up and
// notice that they should terminate. Termination is signalled with false.
for (auto i = 0u; i < threadsToRemove; i++) {
auto fakeTask = std::make_pair(false,
std::make_shared<std::function<void(void)>>([] {}));
mTasks.push(fakeTask);
for (auto i = 0u; i < threadsToRemove; ++i) {
auto fake_task = std::make_pair(false, std::make_shared
<std::function<void(void)>> ([] {}));
mTasks.push(fake_task);
}
mThreadCount -= threadsToRemove;
......@@ -184,14 +193,11 @@ public:
std::future<Ret> PushTask(std::function<Ret(void)> func)
{
auto task = std::make_shared<std::packaged_task<Ret(void)>>(func);
auto taskFunc = std::make_pair(
true,
std::make_shared<std::function<void(void)>>(
[task] {
auto taskFunc =
std::make_pair(true,
std::make_shared<std::function<void(void)>>([task] {
(*task)();
}
)
);
}));
mTasks.push(taskFunc);
return task->get_future();
}
......@@ -209,10 +215,10 @@ public:
// Push in fake tasks for each threads so all waiting can wake up and
// notice that running is over. Termination is signalled with false.
for (auto i = 0u; i < mThreadPool.size(); i++) {
auto fakeTask = std::make_pair(false,
std::make_shared<std::function<void(void)>>([] {}));
mTasks.push(fakeTask);
for (auto i = 0u; i < mThreadPool.size(); ++i) {
auto fake_task = std::make_pair(false, std::make_shared
<std::function<void(void)>> ([] {}));
mTasks.push(fake_task);
}
for (auto& future : mThreadPool) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment