Commit 7ce4323d authored by Georgios Bitzes's avatar Georgios Bitzes

FST: Use AssistedThread in Health class

parent 9be3733f
Pipeline #705920 failed with stages
in 38 minutes and 33 seconds
......@@ -5,7 +5,7 @@
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
......@@ -21,13 +21,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef COMMON_ASSISTED_THREAD_H__
#define COMMON_ASSISTED_THREAD_H__
#ifndef EOS_ASSISTED_THREAD_H
#define EOS_ASSISTED_THREAD_H
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
//------------------------------------------------------------------------------
// C++ threads offer no easy way to stop a thread once it's started. Signalling
......@@ -60,123 +62,191 @@ class AssistedThread;
//------------------------------------------------------------------------------
//! Class ThreadAssistant
//------------------------------------------------------------------------------
class ThreadAssistant
{
class ThreadAssistant {
public:
ThreadAssistant(bool flag) : stopFlag(flag) {}
void reset()
{
void reset() {
stopFlag = false;
terminationCallbacks.clear();
}
void requestTermination() {
std::lock_guard<std::mutex> lock(mtx);
if(!stopFlag) {
stopFlag = true;
notifier.notify_all();
for(size_t i = 0; i < terminationCallbacks.size(); i++) {
terminationCallbacks[i]();
}
}
}
void registerCallback(std::function<void()> callable) {
std::lock_guard<std::mutex> lock(mtx);
terminationCallbacks.emplace_back(std::move(callable));
if(stopFlag) {
//------------------------------------------------------------------------
// Careful here.. This is a race condition where thread termination has
// already been requested, even though we're not done yet registering
// callbacks, apparently.
//
// Let's simply call the callback ourselves.
//------------------------------------------------------------------------
(terminationCallbacks.back())();
}
}
void requestTermination()
{
void dropCallbacks() {
std::lock_guard<std::mutex> lock(mtx);
stopFlag = true;
notifier.notify_all();
terminationCallbacks.clear();
}
bool terminationRequested()
{
bool terminationRequested() {
return stopFlag;
}
template<typename T>
void wait_for(T duration)
{
void wait_for(T duration) {
std::unique_lock<std::mutex> lock(mtx);
if (stopFlag) {
return;
}
if(stopFlag) return;
notifier.wait_for(lock, duration);
}
template<typename T>
void wait_until(T duration)
{
void wait_until(T duration) {
std::unique_lock<std::mutex> lock(mtx);
if (stopFlag) {
return;
}
if(stopFlag) return;
notifier.wait_until(lock, duration);
}
//----------------------------------------------------------------------------
// Ok, this is a bit weird: Consider an AssistedThread which "owns" or
// coordinates a bunch of other threads:
//
// void Coordinator(ThreadAssistant &assistant) {
// AssistedThread worker1( ... );
// AssistedThread worker2( ... );
// AssistedThread worker3( ... );
//
// worker1.blockUntilThreadJoins();
// worker2.blockUntilThreadJoins();
// worker3.blockUntilThreadJoins();
// }
//
// We would like that any requests to shut down Coordinator propagate to all
// workers. Otherwise, since Coordinator blocks waiting for the workers to
// terminate, its own early termination signal would get ignored.
//
// propagateTerminationSignal does just this. In the above example, call:
// assistant.propagateTerminationSignal(worker1);
// assistant.propagateTerminationSignal(worker2);
// assistant.propagateTerminationSignal(worker3);
//
// And the moment Coordinator is asked to terminate, all registered threads
// will, too.
//
// NOTE: assistant object must belong to a different thread!
//----------------------------------------------------------------------------
void propagateTerminationSignal(AssistedThread &thread);
private:
// Private constructor - only AssistedThread can create such an object.
ThreadAssistant(bool flag) : stopFlag(flag) {}
friend class AssistedThread;
std::atomic<bool> stopFlag;
std::mutex mtx;
std::condition_variable notifier;
std::vector<std::function<void()>> terminationCallbacks;
};
//------------------------------------------------------------------------------
//! Class AssistedThread
//------------------------------------------------------------------------------
class AssistedThread
{
class AssistedThread {
public:
//----------------------------------------------------------------------------
//! null constructor, no underlying thread
//----------------------------------------------------------------------------
AssistedThread() : assistant(true), joined(true) { }
AssistedThread() : assistant(new ThreadAssistant(true)), joined(true) { }
//----------------------------------------------------------------------------
// universal references, perfect forwarding, variadic template
// (C++ is intensifying)
//----------------------------------------------------------------------------
template<typename... Args>
AssistedThread(Args&& ... args) :
assistant(false), joined(false),
th(std::forward<Args>(args)..., std::ref(assistant))
{
AssistedThread(Args&&... args) : assistant(new ThreadAssistant(false)), joined(false), th(std::forward<Args>(args)..., std::ref(*assistant)) {
}
// No assignment!
// No assignment, no copying
AssistedThread& operator=(const AssistedThread&) = delete;
AssistedThread& operator=(AssistedThread&& src) = delete;
// Moving is allowed.
AssistedThread(AssistedThread&& other) {
assistant = std::move(other.assistant);
joined = other.joined;
th = std::move(other.th);
other.joined = true;
}
template<typename... Args>
void reset(Args&& ... args)
{
void reset(Args&&... args) {
join();
assistant.reset();
assistant.get()->reset();
joined = false;
th = std::thread(std::forward<Args>(args)..., std::ref(assistant));
th = std::thread(std::forward<Args>(args)..., std::ref(*assistant));
}
virtual ~AssistedThread()
{
virtual ~AssistedThread() {
join();
}
void stop()
{
if (joined) {
return;
}
void stop() {
if(joined) return;
assistant->requestTermination();
}
assistant.requestTermination();
void join() {
if(joined) return;
stop();
blockUntilThreadJoins();
}
void join()
{
if (joined) {
return;
}
// Different meaning than join, which explicitly asks the thread to
// terminate. Here, we simply wait until the thread exits on its own.
void blockUntilThreadJoins() {
if(joined) return;
stop();
th.join();
joined = true;
}
void registerCallback(std::function<void()> callable) {
assistant->registerCallback(std::move(callable));
}
void dropCallbacks() {
assistant->dropCallbacks();
}
//----------------------------------------------------------------------------
//! Set thread name. Useful to have in GDB traces, for example.
//----------------------------------------------------------------------------
void setName(const std::string &threadName) {
pthread_setname_np(th.native_handle(), threadName.c_str());
}
private:
ThreadAssistant assistant;
std::unique_ptr<ThreadAssistant> assistant;
bool joined;
std::thread th;
};
inline void ThreadAssistant::propagateTerminationSignal(AssistedThread &thread) {
registerCallback(std::bind(&AssistedThread::stop, &thread));
}
#endif
......@@ -249,22 +249,11 @@ std::string DiskHealth::smartctl(const char* device)
// **** Class Health ****
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Static helper function for starting the health monitoring thread
//------------------------------------------------------------------------------
void*
Health::StartHealthThread(void* pp)
{
Health* health = (Health*) pp;
health->Measure();
return 0;
}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
Health::Health(unsigned int ival_minutes):
mSkip(false), mTid(0), mIntervalMin(ival_minutes)
mSkip(false), mIntervalMin(ival_minutes)
{
if (mIntervalMin == 0) {
mIntervalMin = 1;
......@@ -276,41 +265,30 @@ Health::Health(unsigned int ival_minutes):
//------------------------------------------------------------------------------
Health::~Health()
{
if (mTid) {
XrdSysThread::Cancel(mTid);
XrdSysThread::Join(mTid, 0);
mTid = 0;
}
}
//------------------------------------------------------------------------------
// Method starting the health monitoring thread
//------------------------------------------------------------------------------
bool
void
Health::Monitor()
{
if (XrdSysThread::Run(&mTid, Health::StartHealthThread,
static_cast<void*>(this),
XRDSYSTHREAD_HOLD, "Health-Monitor")) {
return false;
} else {
return true;
}
monitoringThread.reset(&Health::Measure, this);
monitoringThread.setName("Health-Monitor");
}
//------------------------------------------------------------------------------
// Loop run by the monitoring thread to keep updated the disk health info.
//------------------------------------------------------------------------------
void
Health::Measure()
Health::Measure(ThreadAssistant &assistant)
{
while (1) {
XrdSysThread::SetCancelOff();
while(!assistant.terminationRequested()) {
mDiskHealth.Measure();
XrdSysThread::SetCancelOn();
for (unsigned int i = 0; i < mIntervalMin; i++) {
sleep(60);
if(assistant.terminationRequested()) return;
assistant.wait_for(std::chrono::seconds(60));
if (mSkip) {
mSkip = false;
......
......@@ -28,6 +28,7 @@
#include <mutex>
#include <map>
#include <atomic>
#include "common/AssistedThread.hh"
EOSFSTNAMESPACE_BEGIN
......@@ -111,15 +112,13 @@ public:
//----------------------------------------------------------------------------
//! Method starting the health monitoring thread
//!
//! @return true if thread started succesfully, otherwise false
//----------------------------------------------------------------------------
bool Monitor();
void Monitor();
//----------------------------------------------------------------------------
//! Loop run by the monitoring thread to keep updated the disk health info.
//----------------------------------------------------------------------------
void Measure();
void Measure(ThreadAssistant &assistant);
//----------------------------------------------------------------------------
//! Get disk health information for a specific device. If no measurements
......@@ -134,7 +133,7 @@ public:
private:
///< Trigger update thread without waiting for the whole interval to elapse
std::atomic<bool> mSkip;
pthread_t mTid; ///< Monitoring thread id
AssistedThread monitoringThread; ///< Monitoring thread
unsigned int mIntervalMin; ///< Minutes interval when monitoring thread runs
DiskHealth mDiskHealth; ///< Objecting collecting disk health information
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment