MGM: improve locking in notification - avoid ns mutex wherever it is easily…

MGM: improve locking in notification - avoid ns mutex wherever it is easily possible when broadcasting
parent 466a4283
Pipeline #508666 passed with stages
in 89 minutes and 53 seconds
......@@ -954,6 +954,7 @@ FuseServer::Caps::BroadcastReleaseFromExternal(uint64_t id)
eos::common::RWMutexReadLock lLock(*this);
eos_static_info("id=%lx ",
id);
std::vector<shared_cap> bccaps;
if (mInodeCaps.count(id)) {
for (auto it = mInodeCaps[id].begin();
......@@ -968,14 +969,20 @@ FuseServer::Caps::BroadcastReleaseFromExternal(uint64_t id)
}
if (cap->id()) {
gOFS->zMQ->gFuseServer.Client().ReleaseCAP((uint64_t) cap->id(),
cap->clientuuid(),
cap->clientid());
errno = 0 ; // seems that ZMQ function might set errno
bccaps.push_back(cap);
}
}
}
lLock.Release();
for (auto it : bccaps) {
gOFS->zMQ->gFuseServer.Client().ReleaseCAP((uint64_t) it->id(),
it->clientuuid(),
it->clientid());
errno = 0 ; // seems that ZMQ function might set errno
}
EXEC_TIMING_END("Eosxd::int::BcReleaseExt");
return 0;
}
......@@ -992,6 +999,7 @@ FuseServer::Caps::BroadcastRelease(const eos::fusex::md& md)
refcap->clientid().c_str(),
refcap->clientuuid().c_str(),
refcap->authid().c_str());
std::vector<shared_cap> bccaps;
if (mInodeCaps.count(refcap->id())) {
for (auto it = mInodeCaps[refcap->id()].begin();
......@@ -1016,13 +1024,20 @@ FuseServer::Caps::BroadcastRelease(const eos::fusex::md& md)
}
if (cap->id()) {
gOFS->zMQ->gFuseServer.Client().ReleaseCAP((uint64_t) cap->id(),
cap->clientuuid(),
cap->clientid());
bccaps.push_back(cap);
}
}
}
lLock.Release();
for (auto it : bccaps) {
gOFS->zMQ->gFuseServer.Client().ReleaseCAP((uint64_t) it->id(),
it->clientuuid(),
it->clientid());
errno = 0 ;
}
EXEC_TIMING_END("Eosxd::int::BcRelease");
return 0;
}
......@@ -1040,6 +1055,7 @@ FuseServer::Caps::BroadcastDeletionFromExternal(uint64_t id,
eos_static_info("id=%lx name=%s",
id,
name.c_str());
std::vector<shared_cap> bccaps;
if (mInodeCaps.count(id)) {
for (auto it = mInodeCaps[id].begin();
......@@ -1054,15 +1070,21 @@ FuseServer::Caps::BroadcastDeletionFromExternal(uint64_t id,
}
if (cap->id()) {
gOFS->zMQ->gFuseServer.Client().DeleteEntry((uint64_t) cap->id(),
cap->clientuuid(),
cap->clientid(),
name);
errno = 0 ; // seems that ZMQ function might set errno
bccaps.push_back(cap);
}
}
}
lLock.Release();
for (auto it : bccaps) {
gOFS->zMQ->gFuseServer.Client().DeleteEntry((uint64_t) it->id(),
it->clientuuid(),
it->clientid(),
name);
errno = 0 ; // seems that ZMQ function might set errno
}
EXEC_TIMING_END("Eosxd::int::BcDeletionExt");
return 0;
}
......@@ -1078,6 +1100,7 @@ FuseServer::Caps::BroadcastDeletion(uint64_t id, const eos::fusex::md& md,
eos_static_info("id=%lx name=%s",
id,
name.c_str());
std::vector<shared_cap> bccaps;
if (mInodeCaps.count(refcap->id())) {
for (auto it = mInodeCaps[refcap->id()].begin();
......@@ -1102,14 +1125,21 @@ FuseServer::Caps::BroadcastDeletion(uint64_t id, const eos::fusex::md& md,
}
if (cap->id()) {
gOFS->zMQ->gFuseServer.Client().DeleteEntry((uint64_t) cap->id(),
cap->clientuuid(),
cap->clientid(),
name);
bccaps.push_back(cap);
}
}
}
lLock.Release();
for (auto it : bccaps) {
gOFS->zMQ->gFuseServer.Client().DeleteEntry((uint64_t) it->id(),
it->clientuuid(),
it->clientid(),
name);
errno = 0;
}
EXEC_TIMING_END("Eosxd::int::BcDeletion");
return 0;
}
......@@ -1141,6 +1171,7 @@ FuseServer::Caps::BroadcastMD(const eos::fusex::md& md,
refcap->clientuuid().c_str(),
refcap->authid().c_str());
std::set<std::string> clients_sent;
std::vector<shared_cap> bccaps;
if (mInodeCaps.count(refcap->id())) {
for (auto it = mInodeCaps[refcap->id()].begin();
......@@ -1165,13 +1196,7 @@ FuseServer::Caps::BroadcastMD(const eos::fusex::md& md,
}
if (cap->id() && !clients_sent.count(cap->clientuuid())) {
gOFS->zMQ->gFuseServer.Client().SendMD(md,
cap->clientuuid(),
cap->clientid(),
md_ino,
md_pino,
clock,
p_mtime);
bccaps.push_back(cap);
// make sure we sent the update only once to each client, eveh if this
// one has many caps
clients_sent.insert(cap->clientuuid());
......@@ -1179,6 +1204,19 @@ FuseServer::Caps::BroadcastMD(const eos::fusex::md& md,
}
}
lLock.Release();
for (auto it : bccaps) {
gOFS->zMQ->gFuseServer.Client().SendMD(md,
it->clientuuid(),
it->clientid(),
md_ino,
md_pino,
clock,
p_mtime);
errno = 0;
}
EXEC_TIMING_END("Eosxd::int::BcMD");
return 0;
}
......
......@@ -49,6 +49,7 @@ public:
~FuseServer();
void start();
void shutdown();
std::string dump_message(const google::protobuf::Message& message);
......
......@@ -68,7 +68,6 @@ XrdMgmOfs::_attr_ls(const char* path, XrdOucErrInfo& error,
gOFS->MgmStats.Add("AttrLs", vid.uid, vid.gid, 1);
eos::common::RWMutexReadLock ns_rd_lock;
errno = 0;
eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView, path);
if (take_lock) {
......@@ -194,7 +193,6 @@ XrdMgmOfs::_attr_set(const char* path, XrdOucErrInfo& error,
std::shared_ptr<eos::IContainerMD> dh;
eos::common::RWMutexWriteLock ns_wr_lock;
eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView, path);
if (take_lock) {
......@@ -232,11 +230,19 @@ XrdMgmOfs::_attr_set(const char* path, XrdOucErrInfo& error,
}
dh->setAttribute(key, val.c_str());
if (Key != "sys.tmp.etag") {
dh->setCTimeNow();
dh->setCTimeNow();
}
eosView->updateContainerStore(dh.get());
gOFS->FuseXCastContainer(dh->getIdentifier());
eos::ContainerIdentifier d_id = dh->getIdentifier();
if (take_lock) {
ns_wr_lock.Release();
}
gOFS->FuseXCastContainer(d_id);
errno = 0;
}
} catch (eos::MDException& e) {
......@@ -261,11 +267,19 @@ XrdMgmOfs::_attr_set(const char* path, XrdOucErrInfo& error,
XrdOucString val;
eos::common::SymKey::DeBase64(val64, val);
fmd->setAttribute(key, val.c_str());
if (Key != "sys.tmp.etag") {
fmd->setCTimeNow();
}
if (Key != "sys.tmp.etag") {
fmd->setCTimeNow();
}
eosView->updateFileStore(fmd.get());
gOFS->FuseXCastFile(fmd->getIdentifier());
eos::FileIdentifier f_id = fmd->getIdentifier();
if (take_lock) {
ns_wr_lock.Release();
}
gOFS->FuseXCastFile(f_id);
errno = 0;
}
} catch (eos::MDException& e) {
......@@ -342,8 +356,8 @@ XrdMgmOfs::_attr_get(const char* path, XrdOucErrInfo& error,
}
eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView, path);
eos::common::RWMutexReadLock viewReadLock;
if (take_lock) {
viewReadLock.Grab(gOFS->eosViewRWMutex);
}
......@@ -388,7 +402,6 @@ XrdMgmOfs::_attr_get(const char* path, XrdOucErrInfo& error,
}
viewReadLock.Release();
// we always decode attributes here, even if they are stored as base64:
XrdOucString val64 = value;
eos::common::SymKey::DeBase64(val64, value);
......@@ -615,7 +628,9 @@ XrdMgmOfs::_attr_rem(const char* path, XrdOucErrInfo& error,
if (dh->hasAttribute(key)) {
dh->removeAttribute(key);
eosView->updateContainerStore(dh.get());
gOFS->FuseXCastContainer(dh->getIdentifier());
eos::ContainerIdentifier d_id = dh->getIdentifier();
lock.Release();
gOFS->FuseXCastContainer(d_id);
} else {
errno = ENODATA;
}
......@@ -644,7 +659,9 @@ XrdMgmOfs::_attr_rem(const char* path, XrdOucErrInfo& error,
if (fmd->hasAttribute(key)) {
fmd->removeAttribute(key);
eosView->updateFileStore(fmd.get());
gOFS->FuseXCastFile(fmd->getIdentifier());
eos::FileIdentifier f_id = fmd->getIdentifier();
lock.Release();
gOFS->FuseXCastFile(f_id);
errno = 0;
} else {
errno = ENODATA;
......
......@@ -164,7 +164,9 @@ XrdMgmOfs::_chmod(const char* path,
}
eosView->updateContainerStore(pcmd.get());
gOFS->FuseXCastContainer(pcmd->getIdentifier());
eos::ContainerIdentifier pcmd_id = pcmd->getIdentifier();
eos::ContainerIdentifier cmd_id;
eos::FileIdentifier f_id;
if (cmd) {
Mode &= mask;
......@@ -172,7 +174,7 @@ XrdMgmOfs::_chmod(const char* path,
cmd->setCTimeNow();
// store the in-memory modification time for this directory
eosView->updateContainerStore(cmd.get());
gOFS->FuseXCastContainer(cmd->getIdentifier());
cmd_id = cmd->getIdentifier();
}
if (fmd) {
......@@ -180,7 +182,18 @@ XrdMgmOfs::_chmod(const char* path,
Mode &= (S_IRWXU | S_IRWXG | S_IRWXO);
fmd->setFlags(Mode);
eosView->updateFileStore(fmd.get());
gOFS->FuseXCastFile(fmd->getIdentifier());
f_id = fmd->getIdentifier();
}
lock.Release();
gOFS->FuseXCastContainer(pcmd_id);
if (cmd) {
gOFS->FuseXCastContainer(cmd_id);
}
if (fmd) {
gOFS->FuseXCastFile(f_id);
}
errno = 0;
......
......@@ -230,7 +230,9 @@ XrdMgmOfs::_symlink(const char* source_name,
dir->setMTimeNow();
dir->notifyMTimeChange(gOFS->eosDirectoryService);
eosView->updateContainerStore(dir.get());
gOFS->FuseXCastContainer(dir->getIdentifier());
eos::ContainerIdentifier dir_id = dir->getIdentifier();
lock.Release();
gOFS->FuseXCastContainer(dir_id);
} catch (eos::MDException& e) {
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
......
......@@ -68,7 +68,9 @@ XrdMgmOfs::merge(const char* src, const char* dst, XrdOucErrInfo& error,
src_fmd->setMTime(mtime);
src_fmd->setFlags(dst_fmd->getFlags());
eosView->updateFileStore(src_fmd.get());
gOFS->FuseXCastFile(src_fmd->getIdentifier());
eos::FileIdentifier f_id = src_fmd->getIdentifier();
viewLock.Release();
gOFS->FuseXCastFile(f_id);
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("caught exception %d %s\n", e.getErrno(),
......
......@@ -354,10 +354,13 @@ XrdMgmOfs::_mkdir(const char* path,
// commit
eosView->updateContainerStore(newdir.get());
eosView->updateContainerStore(dir.get());
gOFS->FuseXCastContainer(newdir->getIdentifier());
gOFS->FuseXCastContainer(dir->getIdentifier());
dir->notifyMTimeChange(gOFS->eosDirectoryService);
newdir->notifyMTimeChange(gOFS->eosDirectoryService);
eos::ContainerIdentifier nd_id = newdir->getIdentifier();
eos::ContainerIdentifier d_id = dir->getIdentifier();
lock.Release();
gOFS->FuseXCastContainer(nd_id);
gOFS->FuseXCastContainer(d_id);
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
......@@ -414,11 +417,14 @@ XrdMgmOfs::_mkdir(const char* path,
// Commit to backend
eosView->updateContainerStore(newdir.get());
eosView->updateContainerStore(dir.get());
gOFS->FuseXCastContainer(newdir->getIdentifier());
gOFS->FuseXCastContainer(dir->getIdentifier());
// Notify after attribute inheritance
newdir->notifyMTimeChange(gOFS->eosDirectoryService);
dir->notifyMTimeChange(gOFS->eosDirectoryService);
eos::ContainerIdentifier nd_id = newdir->getIdentifier();
eos::ContainerIdentifier d_id = dir->getIdentifier();
lock.Release();
gOFS->FuseXCastContainer(nd_id);
gOFS->FuseXCastContainer(d_id);
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"",
......
......@@ -210,16 +210,24 @@ XrdMgmOfs::_remdir(const char* path,
if (!simulate) {
try {
eos::ContainerIdentifier dhpar_id;
std::string dh_name;
// update the in-memory modification time of the parent directory
if (dhpar) {
dhpar->setMTimeNow();
dhpar->notifyMTimeChange(gOFS->eosDirectoryService);
eosView->updateContainerStore(dhpar.get());
gOFS->FuseXCastContainer(dhpar->getIdentifier());
gOFS->FuseXCastDeletion(dhpar->getIdentifier(), dh->getName());
dhpar_id = dhpar->getIdentifier();
dh_name = dh->getName();
}
eosView->removeContainer(path);
if (dhpar) {
gOFS->FuseXCastContainer(dhpar_id);
gOFS->FuseXCastDeletion(dhpar_id, dh_name);
}
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
......
......@@ -125,8 +125,10 @@
if (container) {
container->setMTimeNow();
gOFS->eosView->updateContainerStore(container.get());
gOFS->FuseXCastContainer(container->getIdentifier());
container->notifyMTimeChange(gOFS->eosDirectoryService);
eos::ContainerIdentifier container_id = container->getIdentifier();
lock.Release();
gOFS->FuseXCastContainer(container_id);
}
}
} catch (...) {
......
......@@ -33,7 +33,7 @@
MAYREDIRECT;
gOFS->MgmStats.Add("Eosxd::ext::0-HANDLE", vid.uid, vid.gid, 1);
EXEC_TIMING_BEGIN("Eosxd::ext::0-HANDLE");
// receive a protocol buffer and apply to the namespace
std::string id = std::string("Fusex::sync:") + vid.tident.c_str();
......@@ -67,7 +67,7 @@
std::string response = "Fusex:";
response += b64response;
error.setErrInfo(response.length(), response.c_str());
EXEC_TIMING_END("Eosxd::ext::0-HANDLE");
return SFS_DATA;
}
......@@ -1989,6 +1989,8 @@ XrdMgmOfs::Configure(XrdSysError& Eroute)
eos_static_info(ss.str().c_str());
// add all stat entries with 0
InitStats();
// start the fuse server
gOFS->zMQ->gFuseServer.start();
// set IO accounting file
XrdOucString ioaccounting = MgmMetaLogDir;
ioaccounting += "/iostat.";
......
......@@ -866,8 +866,11 @@ XrdMgmOfsFile::open(const char* inpath,
cmd->setMTimeNow();
cmd->notifyMTimeChange(gOFS->eosDirectoryService);
gOFS->eosView->updateContainerStore(cmd.get());
gOFS->FuseXCastContainer(cmd->getIdentifier());
gOFS->FuseXCastContainer(cmd->getParentIdentifier());
eos::ContainerIdentifier cmd_id = cmd->getIdentifier();
eos::ContainerIdentifier pcmd_id = cmd->getParentIdentifier();
lock.Release();
gOFS->FuseXCastContainer(cmd_id);
gOFS->FuseXCastContainer(pcmd_id);
} catch (eos::MDException& e) {
fmd.reset();
errno = e.getErrno();
......@@ -1096,15 +1099,15 @@ XrdMgmOfsFile::open(const char* inpath,
}
try {
eos::FileIdentifier fmd_id = fmd->getIdentifier();
gOFS->eosView->updateFileStore(fmd.get());
gOFS->FuseXCastFile(fmd->getIdentifier());
std::shared_ptr<eos::IContainerMD> cmd =
gOFS->eosDirectoryService->getContainerMD(cid);
cmd->setMTimeNow();
cmd->notifyMTimeChange(gOFS->eosDirectoryService);
gOFS->eosView->updateContainerStore(cmd.get());
gOFS->FuseXCastContainer(cmd->getIdentifier());
gOFS->FuseXCastContainer(cmd->getParentIdentifier());
eos::ContainerIdentifier cmd_id = cmd->getIdentifier();
eos::ContainerIdentifier pcmd_id = cmd->getParentIdentifier();
if (isCreation || (!fmd->getNumLocation())) {
eos::IQuotaNode* ns_quota = gOFS->eosView->getQuotaNode(cmd.get());
......@@ -1113,6 +1116,11 @@ XrdMgmOfsFile::open(const char* inpath,
ns_quota->addFile(fmd.get());
}
}
lock.Release();
gOFS->FuseXCastFile(fmd_id);
gOFS->FuseXCastContainer(cmd_id);
gOFS->FuseXCastContainer(pcmd_id);
} catch (eos::MDException& e) {
errno = e.getErrno();
std::string errmsg = e.getMessage().str();
......@@ -1780,7 +1788,8 @@ XrdMgmOfsFile::open(const char* inpath,
if (std::find(unavailfs.begin(), unavailfs.end(),
selectedfs[k]) == unavailfs.end()) {
// take the highest fsid with the same geotag if possible
if ((vid.geolocation.empty() || (fsgeotag.find(vid.geolocation) != std::string::npos)) &&
if ((vid.geolocation.empty() ||
(fsgeotag.find(vid.geolocation) != std::string::npos)) &&
(selectedfs[k] > fsid)) {
fsIndex = k;
fsid = selectedfs[k];
......@@ -1798,20 +1807,22 @@ XrdMgmOfsFile::open(const char* inpath,
fsid = selectedfs[k];
}
}
// EOS-2787
// reshuffle the selectedfs to set if available the highest with matching geotag in front
// reshuffle the selectedfs to set if available the highest with matching geotag in front
if (fsid) {
std::vector<unsigned int> newselectedfs;
newselectedfs.push_back(fsid);
for (const auto& i : selectedfs) {
if (i != newselectedfs.front()) {
newselectedfs.push_back(i);
}
}
}
selectedfs.swap(newselectedfs);
fsIndex = 0;
}
}
}
} else {
if (!fmd->getSize()) {
......@@ -2454,12 +2465,13 @@ XrdMgmOfsFile::open(const char* inpath,
} else {
if (!isRW) {
eos::IFileMD::ctime_t mtime;
try {
fmd->getMTime(mtime);
redirectionhost += "&mgm.mtime=";
std::string smtime;
smtime += std::to_string(mtime.tv_sec);
redirectionhost += smtime.c_str();
fmd->getMTime(mtime);
redirectionhost += "&mgm.mtime=";
std::string smtime;
smtime += std::to_string(mtime.tv_sec);
redirectionhost += smtime.c_str();
} catch (eos::MDException& ex) {
}
}
......@@ -2570,7 +2582,9 @@ XrdMgmOfsFile::open(const char* inpath,
// only update within the resolution of the access tracking
fmd->setCTimeNow();
gOFS->eosView->updateFileStore(fmd.get());
gOFS->FuseXCastFile(fmd->getIdentifier());
eos::FileIdentifier fmd_id = fmd->getIdentifier();
lock.Release();
gOFS->FuseXCastFile(fmd_id);
}
errno = 0;
......
......@@ -40,6 +40,7 @@ int
ProcCommand::FuseX()
{
gOFS->MgmStats.Add("Eosxd::ext::0-STREAM", pVid->uid, pVid->gid, 1);
EXEC_TIMING_BEGIN("Eosxd::ext::0-STREAM");
// -------------------------------------------------------------------------------------------------------
// This function returns meta data by inode or if provided first translates a path into an inode.
// The client can provide the meta-data clock. If it is equivalent to the stored clock, this function
......@@ -151,16 +152,14 @@ ProcCommand::FuseX()
}
if (errno) {
if (errno != ENOENT)
{
eos_err("msg=\"exception\" ec=%d emsg=\"%s\"",
errno, emsg.c_str());
}
else
{
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"",
errno, emsg.c_str());
if (errno != ENOENT) {
eos_err("msg=\"exception\" ec=%d emsg=\"%s\"",
errno, emsg.c_str());
} else {
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"",
errno, emsg.c_str());
}
return gOFS->Emsg("FuseX", *mError, errno, "get-if-clock",
emsg.c_str());
}
......@@ -208,7 +207,9 @@ ProcCommand::FuseX()
} else {
}
eos_debug("c1=%llu c2=%llu", md_clock, clock);
if (EOS_LOGS_DEBUG) {
eos_debug("c1=%llu c2=%llu", md_clock, clock);
}
if ((sop == "GET") || (sop == "LS")) {
if (md_clock == clock) {
......@@ -228,7 +229,9 @@ ProcCommand::FuseX()
return gOFS->Emsg("FuseX", *mError, rc, "handle request", "");
}
eos_debug("c1=%llu c2=%llu", md_clock, clock);
if (EOS_LOGS_DEBUG) {
eos_debug("c1=%llu c2=%llu", md_clock, clock);
}
if (sop == "GETCAP") {
// check clock synchronization
......@@ -236,18 +239,25 @@ ProcCommand::FuseX()
time_t now = time(NULL);
if ((uint64_t)now > clock + 2) {
eos_err("client-clock %lu %s server-clock %lu", clock, sclock.c_str() , now);
eos_err("client-clock %lu %s server-clock %lu", clock, sclock.c_str(), now);
return gOFS->Emsg("FuseX", *mError, EL2NSYNC, "get-cap-clock-out-of-sync",
inpath);
}
}
mResultStream = result;
eos_debug("returning resultstream len=%u %s", mResultStream.size(),
mResultStream.c_str());
if (EOS_LOGS_DEBUG)
eos_debug("returning resultstream len=%u %s", mResultStream.size(),
mResultStream.c_str());
mLen = mResultStream.size();
eos_debug("result-dump=%s",
eos::common::StringConversion::string_to_hex(result).c_str());
if (EOS_LOGS_DEBUG)
eos_debug("result-dump=%s",
eos::common::StringConversion::string_to_hex(result).c_str());
EXEC_TIMING_END("Eosxd::ext::0-STREAM");
return SFS_OK;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment