Commit f14a5f2e by Elvin Sindrilaru

MGM: Fix concurrent write access to configuration data structures

parent 0b20b900
......@@ -464,7 +464,6 @@ void
FileSystem::CreateConfig(std::string& key, std::string& val)
{
key = val = "";
fs_snapshot_t fs;
XrdMqRWMutexReadLock lock(mSom->HashMutex);
key = mQueuePath;
val = mHash->SerializeWithFilter("stat.");
......
......@@ -69,6 +69,7 @@ FileCfgEngineChangelog::AddEntry(const char* info)
return false;
}
eos::common::RWMutexWriteLock wr_lock(mMutex);
mMap.set(key, value, action);
mConfigChanges += info;
mConfigChanges += "\n";
......@@ -127,13 +128,6 @@ FileConfigEngine::FileConfigEngine(const char* config_dir)
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
FileConfigEngine::~FileConfigEngine()
{
}
//------------------------------------------------------------------------------
// Set configuration directory
//------------------------------------------------------------------------------
void
......@@ -148,11 +142,10 @@ FileConfigEngine::SetConfigDir(const char* config_dir)
// Get configuration changes
//------------------------------------------------------------------------------
void
FileConfigEngine::Diffs(XrdOucString& diffs)
FileConfigEngine::Diffs(std::string& diffs) const
{
diffs = mChangelog->GetChanges();
while (diffs.replace("&", " ")) {}
std::replace(diffs.begin(), diffs.end(), '&', ' ');
}
//------------------------------------------------------------------------------
......
......@@ -103,7 +103,7 @@ public:
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
virtual ~FileConfigEngine();
virtual ~FileConfigEngine() = default;
//----------------------------------------------------------------------------
//! Load a given configuratino file
......@@ -141,7 +141,7 @@ public:
//!
//! @param diffs string holding the configuration changes
//----------------------------------------------------------------------------
void Diffs(XrdOucString& diffs);
void Diffs(std::string& diffs) const;
//----------------------------------------------------------------------------
//! Do an autosave
......
......@@ -48,9 +48,10 @@ XrdOucHash<XrdOucString> IConfigEngine::sConfigDefinitions;
//------------------------------------------------------------------------------
// Get latest changes
//------------------------------------------------------------------------------
XrdOucString
std::string
ICfgEngineChangelog::GetChanges() const
{
eos::common::RWMutexReadLock rd_lock(mMutex);
return mConfigChanges;
}
......@@ -60,7 +61,8 @@ ICfgEngineChangelog::GetChanges() const
bool
ICfgEngineChangelog::HasChanges() const
{
return (mConfigChanges.length() != 0);
eos::common::RWMutexReadLock rd_lock(mMutex);
return (!mConfigChanges.empty());
}
//------------------------------------------------------------------------------
......@@ -69,7 +71,8 @@ ICfgEngineChangelog::HasChanges() const
void
ICfgEngineChangelog::ClearChanges()
{
mConfigChanges = "";
eos::common::RWMutexWriteLock wr_lock(mMutex);
mConfigChanges.clear();
}
//------------------------------------------------------------------------------
......
......@@ -52,12 +52,15 @@ public:
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
ICfgEngineChangelog() {};
ICfgEngineChangelog()
{
mMutex.SetBlocking(true);
}
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
virtual ~ICfgEngineChangelog() {};
virtual ~ICfgEngineChangelog() = default;
//----------------------------------------------------------------------------
//! Add entry
......@@ -83,7 +86,7 @@ public:
//!
//! @return string representing the changes, can also be an empty string
//----------------------------------------------------------------------------
XrdOucString GetChanges() const;
std::string GetChanges() const;
//----------------------------------------------------------------------------
//! Check if there are any changes
......@@ -111,7 +114,8 @@ protected:
bool ParseTextEntry(const char* entry, std::string& key, std::string& value,
std::string& comment);
XrdOucString mConfigChanges; ///< Latest configuration changes
mutable eos::common::RWMutex mMutex; ///< Mutex protecting the config changes
std::string mConfigChanges; ///< Latest configuration changes
};
......@@ -220,7 +224,7 @@ public:
//!
//! @param diffs string holding the configuration changes
//----------------------------------------------------------------------------
virtual void Diffs(XrdOucString& diffs) = 0;
virtual void Diffs(std::string& diffs) const = 0;
//----------------------------------------------------------------------------
//! Do an autosave
......
......@@ -148,7 +148,7 @@ public:
//!
//! @param diffs string holding the configuration changes
//----------------------------------------------------------------------------
void Diffs(XrdOucString& diffs)
void Diffs(std::string& diffs) const
{
diffs = mChangelog->GetChanges();
}
......
......@@ -147,7 +147,9 @@ ProcCommand::Config()
if (mSubCmd == "diff") {
eos_notice("config diff");
gOFS->ConfEngine->Diffs(stdOut);
std::string diff;
gOFS->ConfEngine->Diffs(diff);
stdOut = diff.c_str();
}
if (mSubCmd == "changelog") {
......
......@@ -336,23 +336,20 @@ XrdMqSharedHash::GetUInt(const char* key)
std::string
XrdMqSharedHash::SerializeWithFilter(const char* filter_prefix)
{
XrdOucString key;
std::string out = "";
std::string key {""};
std::ostringstream oss;
XrdMqRWMutexReadLock rd_lock(*mStoreMutex);
for (auto it = mStore.begin(); it != mStore.end(); it++) {
for (auto it = mStore.begin(); it != mStore.end(); ++it) {
key = it->first.c_str();
if ((!filter_prefix || !strlen(filter_prefix)) ||
(!key.beginswith(filter_prefix))) {
out += it->first.c_str();
out += "=";
out += it->second.GetValue();
out += " ";
if (((filter_prefix == nullptr) || (strlen(filter_prefix) == 0)) ||
(key.find(filter_prefix) == 0)) {
oss << key << "=" << it->second.GetValue() << " ";
}
}
return out;
return oss.str();
}
//------------------------------------------------------------------------------
......@@ -516,7 +513,7 @@ XrdMqSharedHash::BroadCastEnvString(const char* receiver)
{
XrdMqRWMutexReadLock rd_lock(*mStoreMutex);
for (auto it = mStore.begin(); it != mStore.end(); it++) {
for (auto it = mStore.begin(); it != mStore.end(); ++it) {
mTransactions.insert(it->first);
}
}
......@@ -557,7 +554,7 @@ XrdMqSharedHash::AddTransactionsToEnvString(XrdOucString& out, bool clear_after)
out += "=";
XrdMqRWMutexReadLock rd_lock(*mStoreMutex);
for (auto it = mTransactions.begin(); it != mTransactions.end(); it++) {
for (auto it = mTransactions.begin(); it != mTransactions.end(); ++it) {
if ((mStore.count(it->c_str()))) {
out += "|";
out += it->c_str();
......@@ -631,7 +628,7 @@ XrdMqSharedHash::Dump(XrdOucString& out)
char key_print[64];
XrdMqRWMutexReadLock rd_lock(*mStoreMutex);
for (auto it = mStore.begin(); it != mStore.end(); it++) {
for (auto it = mStore.begin(); it != mStore.end(); ++it) {
snprintf(key_print, sizeof(key_print) - 1, "key=%-24s", it->first.c_str());
out += key_print;
out += " ";
......@@ -700,7 +697,7 @@ XrdMqSharedHash::Clear(bool broadcast)
{
XrdMqRWMutexWriteLock wr_lock(*mStoreMutex);
for (auto it = mStore.begin(); it != mStore.end(); it++) {
for (auto it = mStore.begin(); it != mStore.end(); ++it) {
if (mIsTransaction) {
if (XrdMqSharedObjectManager::sBroadcast && broadcast) {
mDeletions.insert(it->first);
......@@ -720,16 +717,16 @@ bool
XrdMqSharedHash::SetImpl(const char* key, const char* value, bool broadcast)
{
std::string skey = key;
mStoreMutex->LockWrite();
{
XrdMqRWMutexWriteLock wr_lock(*mStoreMutex);
if (mStore.count(skey) == 0) {
mStore.insert(std::make_pair(skey, XrdMqSharedHashEntry(key, value)));
} else {
mStore[skey] = XrdMqSharedHashEntry(key, value);
if (mStore.count(skey) == 0) {
mStore.insert(std::make_pair(skey, XrdMqSharedHashEntry(key, value)));
} else {
mStore[skey] = XrdMqSharedHashEntry(key, value);
}
}
mStoreMutex->UnLockWrite();
if (XrdMqSharedObjectManager::sBroadcast && broadcast) {
bool is_transact = false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment