Commit 3ddcbbfd authored by Jozsef Makai's avatar Jozsef Makai

MGM: no more queueing in namesapce for sync events

parent d883b82c
......@@ -1698,6 +1698,7 @@ WFE::Job::DoIt(bool issync)
fmd->setAttribute(RETRIEVES_ATTR_NAME, std::to_string(++retrieveCntr));
if (!onGoingRetrieve && onTape) {
fmd->setAttribute(RETRIEVES_ERROR_ATTR_NAME, "");
// Read these attributes here to optimize locking
notification->mutable_file()->mutable_owner()->set_username(GetUserName(fmd->getCUid()));
notification->mutable_file()->mutable_owner()->set_groupname(GetGroupName(fmd->getCGid()));
......@@ -2142,55 +2143,58 @@ WFE::Job::SendProtoWFRequest(Job* jobPtr, const std::string& fullPath,
void
WFE::Job::MoveToRetry(const std::string& filePath) {
int retry = 0, delay = 0;
std::string retryattr = "sys.workflow." + mActions[0].mEvent + "." +
mActions[0].mWorkflow + ".retry.max";
std::string delayattr = "sys.workflow." + mActions[0].mEvent + "." +
mActions[0].mWorkflow + ".retry.delay";
eos_static_info("%s %s", retryattr.c_str(), delayattr.c_str());
if (!IsSync()) {
int retry = 0, delay = 0;
std::string retryattr = "sys.workflow." + mActions[0].mEvent + "." +
mActions[0].mWorkflow + ".retry.max";
std::string delayattr = "sys.workflow." + mActions[0].mEvent + "." +
mActions[0].mWorkflow + ".retry.delay";
eos_static_info("%s %s", retryattr.c_str(), delayattr.c_str());
{
eos::common::Path cPath(filePath.c_str());
auto parentPath = cPath.GetParentPath();
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
auto cmd = gOFS->eosView->getContainer(parentPath);
try {
retry = std::stoi(cmd->getAttribute(retryattr));
} catch (...) {
// retry 25 times by default
retry = 25;
}
{
eos::common::Path cPath(filePath.c_str());
auto parentPath = cPath.GetParentPath();
try {
delay = std::stoi(cmd->getAttribute(delayattr));
} catch (...) {
// retry after 1 hour by default and one final longer wait
delay = mRetry == retry - 1 ? 7200 : 3600;
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
auto cmd = gOFS->eosView->getContainer(parentPath);
try {
retry = std::stoi(cmd->getAttribute(retryattr));
} catch (...) {
// retry 25 times by default
retry = 25;
}
try {
delay = std::stoi(cmd->getAttribute(delayattr));
} catch (...) {
// retry after 1 hour by default and one final longer wait
delay = mRetry == retry - 1 ? 7200 : 3600;
}
}
}
if (!IsSync() && (mRetry < retry)) {
time_t storetime = (time_t) mActions[0].mTime + delay;
Move("r", "e", storetime, ++mRetry);
Results("e", EAGAIN, "scheduled for retry", storetime);
} else {
eos_static_err("WF event finally failed for %s event of %s file after %d retries.",
mActions[0].mEvent.c_str(), filePath.c_str(), mRetry);
MoveWithResults(SFS_ERROR, "e");
if (mRetry < retry) {
time_t storetime = (time_t) mActions[0].mTime + delay;
Move("r", "e", storetime, ++mRetry);
Results("e", EAGAIN, "scheduled for retry", storetime);
} else {
eos_static_err("WF event finally failed for %s event of %s file after %d retries.",
mActions[0].mEvent.c_str(), filePath.c_str(), mRetry);
MoveWithResults(SFS_ERROR, "e");
}
}
}
void
WFE::Job::MoveWithResults(int rcode, std::string fromQueue) {
time_t storetime = 0;
if (rcode == 0) {
Move(fromQueue, "d", storetime);
Results("d", rcode , "moved to done", storetime);
}
else {
Move(fromQueue, "f", storetime);
Results("f", rcode , "moved to failed", storetime);
if (!IsSync()) {
time_t storetime = 0;
if (rcode == 0) {
Move(fromQueue, "d", storetime);
Results("d", rcode, "moved to done", storetime);
} else {
Move(fromQueue, "f", storetime);
Results("f", rcode, "moved to failed", storetime);
}
}
}
......
......@@ -202,7 +202,6 @@ Workflow::Create(eos::common::Mapping::VirtualIdentity& vid, const std::string&
if (job.IsSync(mEvent)) {
if (WfeEnabled()) {
job.AddAction(mAction, mEvent, t, mWorkflow, "r");
job.Save("r", t);
eos_static_info("running synchronous workflow");
return job.DoIt(true);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment