Commit 4139166a authored by Elvin Sindrilaru's avatar Elvin Sindrilaru

MGM: Forbid running two ApplyMasterConfig in parallel as this can lead to

configuration corruption.

* Refine the master-slave setup so that we're immune to timing related issues.

* Add better synchronization between the booting phase and the supervisor thread.
parent cfbd2bca
Pipeline #699734 passed with stages
in 77 minutes and 41 seconds
......@@ -99,15 +99,14 @@ Messaging::Update(XrdAdvisoryMqMessage* advmsg)
// =========| LockWrite
eos::common::RWMutexWriteLock lock(FsView::gFsView.ViewMutex);
if (FsView::gFsView.RegisterNode(advmsg->kQueue.c_str())) {
if (FsView::gFsView.RegisterNode(nodequeue.c_str())) {
std::string nodeconfigname =
eos::common::GlobalConfig::gConfig.QueuePrefixName(
gOFS->NodeConfigQueuePrefix.c_str(),
advmsg->kQueue.c_str());
gOFS->NodeConfigQueuePrefix.c_str(), nodequeue.c_str());
if (!eos::common::GlobalConfig::gConfig.Get(nodeconfigname.c_str())) {
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(nodeconfigname.c_str(),
advmsg->kQueue.c_str())) {
nodequeue.c_str())) {
eos_static_crit("cannot add node config queue %s", nodeconfigname.c_str());
}
}
......@@ -130,9 +129,9 @@ Messaging::Update(XrdAdvisoryMqMessage* advmsg)
}
}
eos_static_info("Setting heart beat to %llu for node queue=%s\n",
(unsigned long long) advmsg->kMessageHeader.kSenderTime_sec,
nodequeue.c_str());
eos_info("msg=\"setting heart beat to %llu for node queue=%s\"",
(unsigned long long) advmsg->kMessageHeader.kSenderTime_sec,
nodequeue.c_str());
FsView::gFsView.mNodeView[nodequeue]->SetHeartBeat(
advmsg->kMessageHeader.kSenderTime_sec);
......@@ -165,9 +164,9 @@ Messaging::Update(XrdAdvisoryMqMessage* advmsg)
}
}
eos_static_debug("Setting heart beat to %llu for node queue=%s\n",
(unsigned long long) advmsg->kMessageHeader.kSenderTime_sec,
nodequeue.c_str());
eos_debug("msg=\"setting heart beat to %llu for nodequeue=%s\"",
(unsigned long long) advmsg->kMessageHeader.kSenderTime_sec,
nodequeue.c_str());
FsView::gFsView.mNodeView[nodequeue]->SetHeartBeat(
advmsg->kMessageHeader.kSenderTime_sec);
......
......@@ -40,7 +40,7 @@ std::chrono::milliseconds QdbMaster::sLeaseTimeout {10000};
//------------------------------------------------------------------------------
QdbMaster::QdbMaster(const eos::QdbContactDetails& qdb_info,
const std::string& host_port):
mIdentity(host_port), mMasterIdentity(),
mOneOff(true), mIdentity(host_port), mMasterIdentity(),
mIsMaster(false), mConfigLoaded(false),
mAcquireDelay(0)
{
......@@ -182,6 +182,12 @@ QdbMaster::BootNamespace()
}
gOFS->SetupProcFiles();
while (mOneOff) {
std::this_thread::sleep_for(std::chrono::seconds(1));
eos_info("msg=\"wait for the supervisor to run once\"");
}
return true;
}
......@@ -191,7 +197,6 @@ QdbMaster::BootNamespace()
void
QdbMaster::Supervisor(ThreadAssistant& assistant) noexcept
{
static bool one_off = true;
bool new_is_master = false;
std::string old_master;
eos_notice("%s", "msg=\"set up booting stall rule\"");
......@@ -219,11 +224,23 @@ QdbMaster::Supervisor(ThreadAssistant& assistant) noexcept
old_master.c_str(), GetMasterId().c_str());
// Run one-off after boot
if (one_off) {
one_off = false;
new_is_master ? SlaveToMaster() : MasterToSlave();
if (mOneOff) {
if (new_is_master) {
// Increase the lease validity if we're the master
if (!AcquireLease(20000)) {
eos_err("msg=\"failed to renew lease during transition\"");
continue;
}
SlaveToMaster();
} else {
MasterToSlave();
}
eos_notice("%s", "msg=\"remove booting stall rule\"");
Access::SetStallRule(old_stall, new_stall);
Access::StallInfo dummy_stall;
Access::SetStallRule(old_stall, dummy_stall);
mOneOff = false;
} else {
// There was a master-slave transition
if (mIsMaster != new_is_master) {
......@@ -280,11 +297,11 @@ QdbMaster::SlaveToMaster()
}
});
stall_thread.detach();
// Notify all the nodes about the new master identity
FsView::gFsView.BroadcastMasterId(GetMasterId());
Quota::LoadNodes();
EnableNsCaching();
WFE::MoveFromRBackToQ();
// Notify all the nodes about the new master identity
FsView::gFsView.BroadcastMasterId(GetMasterId());
mIsMaster = true;
Access::SetSlaveToMasterRules();
}
......@@ -319,6 +336,8 @@ bool
QdbMaster::ApplyMasterConfig(std::string& stdOut, std::string& stdErr,
Transition::Type transitiontype)
{
static std::mutex sequential_mutex;
std::unique_lock<std::mutex> lock(sequential_mutex);
eos::mgm::FsView::gFsView.SetConfigEngine(nullptr);
gOFS->ConfEngine->SetConfigDir(gOFS->MgmConfigDir.c_str());
......@@ -339,6 +358,8 @@ QdbMaster::ApplyMasterConfig(std::string& stdOut, std::string& stdErr,
}
}
gOFS->SetupGlobalConfig();
if (mConfigLoaded) {
eos::mgm::FsView::gFsView.SetConfigEngine(gOFS->ConfEngine);
}
......@@ -350,11 +371,14 @@ QdbMaster::ApplyMasterConfig(std::string& stdOut, std::string& stdErr,
// Try to acquire lease
//------------------------------------------------------------------------------
bool
QdbMaster::AcquireLease()
QdbMaster::AcquireLease(uint64_t validity_msec)
{
using eos::common::StringConversion;
std::string stimeout = (validity_msec ? StringConversion::stringify(
validity_msec) :
StringConversion::stringify(sLeaseTimeout.count()));
std::future<qclient::redisReplyPtr> f =
mQcl->exec("lease-acquire", sLeaseKey, mIdentity,
eos::common::StringConversion::stringify(sLeaseTimeout.count()));
mQcl->exec("lease-acquire", sLeaseKey, mIdentity, stimeout);
qclient::redisReplyPtr reply = f.get();
if (reply == nullptr) {
......
......@@ -178,9 +178,11 @@ private:
//----------------------------------------------------------------------------
//! Try to acquire lease
//!
//! @param validity_msec validity in milliseconds of the lease
//!
//! @return true if successful, otherwise false
//----------------------------------------------------------------------------
bool AcquireLease();
bool AcquireLease(uint64_t validity_msec = 0ull);
//----------------------------------------------------------------------------
//! Try to acquire lease with delay. If the mAcquireDelay timestamp is set
......@@ -222,6 +224,7 @@ private:
//----------------------------------------------------------------------------
void EnableNsCaching();
std::atomic<bool> mOneOff; ///< Flag to mark that supervisor ran once
std::string mIdentity; ///< MGM identity hostname:port
mutable std::mutex mMutexId; ///< Mutex for the master identity
std::string mMasterIdentity; ///< Current master host
......
......@@ -1294,6 +1294,11 @@ public:
//----------------------------------------------------------------------------
void WaitUntilNamespaceIsBooted(ThreadAssistant& assistant);
//----------------------------------------------------------------------------
//! Set up global config
//----------------------------------------------------------------------------
void SetupGlobalConfig();
//----------------------------------------------------------------------------
// Configuration variables
//----------------------------------------------------------------------------
......
......@@ -1371,26 +1371,8 @@ XrdMgmOfs::Configure(XrdSysError& Eroute)
// set the object manager to listener only
ObjectManager.EnableBroadCast(false);
// setup the modifications which the fs listener thread is waiting for
ObjectManager.SetDebug(false);
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(MgmConfigQueue.c_str(),
"/eos/*/mgm")) {
eos_crit("Cannot add global config queue %s\n", MgmConfigQueue.c_str());
}
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(AllConfigQueue.c_str(),
"/eos/*")) {
eos_crit("Cannot add global config queue %s\n", AllConfigQueue.c_str());
}
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(FstConfigQueue.c_str(),
"/eos/*/fst")) {
eos_crit("Cannot add global config queue %s\n", FstConfigQueue.c_str());
}
std::string out;
eos::common::GlobalConfig::gConfig.PrintBroadCastMap(out);
fprintf(stderr, "%s", out.c_str());
ObjectManager.SetDebug(true);
SetupGlobalConfig();
// Eventually autoload a configuration
if (getenv("EOS_AUTOLOAD_CONFIG")) {
......@@ -1735,14 +1717,17 @@ XrdMgmOfs::Configure(XrdSysError& Eroute)
}
zMQ->ServeFuse();
ObjectManager.SetAutoReplyQueueDerive(true);
ObjectManager.CreateSharedHash("/eos/*", "/eos/*/fst");
XrdOucString dumperfile = MgmMetaLogDir;
dumperfile += "/so.mgm.dump.";
dumperfile += ManagerId;
ObjectManager.StartDumper(dumperfile.c_str());
ObjectManager.SetAutoReplyQueueDerive(true);
}
// This sleep is needed otherwise nodes/fs do not register properly
// with the MGM. ??!!??
std::this_thread::sleep_for(std::chrono::seconds(2));
// Hook to the appropiate config file
std::string stdOut;
std::string stdErr;
......@@ -1811,8 +1796,6 @@ XrdMgmOfs::Configure(XrdSysError& Eroute)
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
if (!ObjectNotifier.Start()) {
eos_crit("error starting the shared object change notifier");
}
......@@ -2231,3 +2214,29 @@ XrdMgmOfs::SetupProcFiles()
eosView->updateFileStore(fmd.get());
}
}
//------------------------------------------------------------------------------
// Set up global config
//------------------------------------------------------------------------------
void
XrdMgmOfs::SetupGlobalConfig()
{
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(MgmConfigQueue.c_str(),
"/eos/*/mgm")) {
eos_crit("Cannot add global config queue %s\n", MgmConfigQueue.c_str());
}
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(AllConfigQueue.c_str(),
"/eos/*")) {
eos_crit("Cannot add global config queue %s\n", AllConfigQueue.c_str());
}
if (!eos::common::GlobalConfig::gConfig.AddConfigQueue(FstConfigQueue.c_str(),
"/eos/*/fst")) {
eos_crit("Cannot add global config queue %s\n", FstConfigQueue.c_str());
}
std::string out;
eos::common::GlobalConfig::gConfig.PrintBroadCastMap(out);
fprintf(stderr, "%s", out.c_str());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment