Commit 94e1b551 authored by Elvin Sindrilaru's avatar Elvin Sindrilaru

MGM: Make sure there are no drains running when changing the status of a file system

parent b479efe0
......@@ -100,21 +100,17 @@ FileSystem::SetConfigStatus(eos::common::FileSystem::fsstatus_t new_status)
// Only master drains
if (ShouldBroadCast()) {
if (drain_tx) {
std::string out_msg;
if (drain_tx > 0) {
if (!gOFS->mDrainEngine.StartFsDrain(this, 0, out_msg)) {
eos_static_err("%s", out_msg.c_str());
return false;
}
} else {
if (!gOFS->mDrainEngine.StopFsDrain(this, out_msg)) {
eos_static_err("%s", out_msg.c_str());
}
std::string out_msg;
if (drain_tx > 0) {
if (!gOFS->mDrainEngine.StartFsDrain(this, 0, out_msg)) {
eos_static_err("%s", out_msg.c_str());
return false;
}
} else {
SetDrainStatus(eos::common::FileSystem::kNoDrain, false);
if (!gOFS->mDrainEngine.StopFsDrain(this, out_msg)) {
eos_static_err("%s", out_msg.c_str());
}
}
}
} else {
......
......@@ -188,7 +188,7 @@ DrainFs::HandleRunningJobs()
}
}
if (mJobsRunning.size() > mMaxJobs.load()) {
if (mJobsRunning.size() > mMaxJobs) {
std::this_thread::sleep_for(seconds(1));
}
}
......@@ -321,7 +321,7 @@ DrainFs::PrepareFs()
}
mStatus = eos::common::FileSystem::kDrainPrepare;
fs->SetDrainStatus(mStatus, false);
fs->SetDrainStatus(mStatus);
fs->SetLongLong("stat.drain.failed", 0, false);
mDrainPeriod = seconds(fs->GetLongLong("drainperiod"));
eos::common::FileSystem::fs_snapshot_t drain_snapshot;
......@@ -534,7 +534,6 @@ DrainFs::ResetCounters()
fs->SetLongLong("stat.drainprogress", 0, false);
fs->SetLongLong("stat.drainretry", 0, false);
fs->SetDrainStatus(eos::common::FileSystem::kNoDrain);
FsView::gFsView.StoreFsConfig(fs);
}
}
......
......@@ -127,11 +127,6 @@ public:
std::future_status::ready));
}
//----------------------------------------------------------------------------
//! Stop draining
//----------------------------------------------------------------------------
void Stop();
private:
//----------------------------------------------------------------------------
//! Reset drain counters and status
......@@ -191,6 +186,12 @@ private:
//---------------------------------------------------------------------------
void SuccessfulDrain();
//----------------------------------------------------------------------------
//! Stop draining - must be called by the same thread supervising the
//! draining.
//----------------------------------------------------------------------------
void Stop();
constexpr static std::chrono::seconds sRefreshTimeout {60};
constexpr static std::chrono::seconds sStallTimeout {600};
eos::IFsView* mNsFsView; ///< File system view
......
......@@ -54,7 +54,7 @@ DrainTransferJob::DoIt()
{
using eos::common::LayoutId;
gOFS->MgmStats.Add("DrainCentralStarted", 0, 0, 1);
eos_debug("running drain job fsid_src=%i, fsid_dst=%i, fid=%llu",
eos_debug("msg=\"running drain job fsid_src=%i, fsid_dst=%i, fid=%llu\"",
mFsIdSource, mFsIdTarget, mFileId);
mStatus = Status::Running;
FileDrainInfo fdrain;
......@@ -88,65 +88,68 @@ DrainTransferJob::DoIt()
return;
}
// Prepare the TPC copy job
std::string log_id = LogId::GenerateLogId();
XrdCl::URL url_src = BuildTpcSrc(fdrain, log_id);
XrdCl::URL url_dst = BuildTpcDst(fdrain, log_id);
while (true) {
// Prepare the TPC copy job
std::string log_id = LogId::GenerateLogId();
XrdCl::URL url_src = BuildTpcSrc(fdrain, log_id);
XrdCl::URL url_dst = BuildTpcDst(fdrain, log_id);
if (!url_src.IsValid() || !url_dst.IsValid()) {
gOFS->MgmStats.Add("DrainCentralFailed", 0, 0, 1);
ReportError("msg=\"srd/dst url not valid\"");
return;
}
// When no more sources are available the url_src is empty
if (!url_src.IsValid() || !url_dst.IsValid()) {
gOFS->MgmStats.Add("DrainCentralFailed", 0, 0, 1);
ReportError("msg=\"no more replicas available\"");
return;
}
// If enabled use xrootd connection pool to avoid bottelnecks on the
// same physical connection
eos::common::XrdConnIdHelper src_id_helper(gOFS->mXrdConnPool, url_src);
eos::common::XrdConnIdHelper dst_id_helper(gOFS->mXrdConnPool, url_dst);
// Populate the properties map of the transfer
XrdCl::PropertyList properties;
properties.Set("force", true);
properties.Set("posc", false);
properties.Set("coerce", false);
properties.Set("source", url_src);
properties.Set("target", url_dst);
properties.Set("sourceLimit", (uint16_t) 1);
properties.Set("chunkSize", (uint32_t)(4 * 1024 * 1024));
properties.Set("parallelChunks", (uint8_t) 1);
properties.Set("tpcTimeout", 900);
// Non-empty files run with TPC only
if (fdrain.mProto.size()) {
properties.Set("thirdParty", "only");
}
// If enabled use xrootd connection pool to avoid bottelnecks on the
// same physical connection
eos::common::XrdConnIdHelper src_id_helper(gOFS->mXrdConnPool, url_src);
eos::common::XrdConnIdHelper dst_id_helper(gOFS->mXrdConnPool, url_dst);
// Populate the properties map of the transfer
XrdCl::PropertyList properties;
properties.Set("force", true);
properties.Set("posc", false);
properties.Set("coerce", false);
properties.Set("source", url_src);
properties.Set("target", url_dst);
properties.Set("sourceLimit", (uint16_t) 1);
properties.Set("chunkSize", (uint32_t)(4 * 1024 * 1024));
properties.Set("parallelChunks", (uint8_t) 1);
// properties.Set("tpcTimeout", 900);
// Non-empty files run with TPC only
if (fdrain.mProto.size()) {
properties.Set("thirdParty", "only");
}
// Create the process job
XrdCl::PropertyList result;
XrdCl::CopyProcess cpy;
cpy.AddJob(properties, &result);
XrdCl::XRootDStatus prepare_st = cpy.Prepare();
eos_info("[tpc]: id=%s url=%s => id=%s url=%s logid=%s prepare_msg=%s",
url_src.GetHostId().c_str(), url_src.GetLocation().c_str(),
url_dst.GetHostId().c_str(), url_dst.GetLocation().c_str(),
log_id.c_str(), prepare_st.ToStr().c_str());
if (prepare_st.IsOK()) {
XrdCl::XRootDStatus tpc_st = cpy.Run(0);
if (!tpc_st.IsOK()) {
eos_err("%s", SSTR("src=" << url_src.GetLocation().c_str() <<
" dst=" << url_dst.GetLocation().c_str() <<
" logid=" << log_id <<
" tpc_err=" << tpc_st.ToStr()).c_str());
// Create the process job
XrdCl::PropertyList result;
XrdCl::CopyProcess cpy;
cpy.AddJob(properties, &result);
XrdCl::XRootDStatus prepare_st = cpy.Prepare();
eos_info("[tpc]: id=%s url=%s => id=%s url=%s logid=%s prepare_msg=%s",
url_src.GetHostId().c_str(), url_src.GetLocation().c_str(),
url_dst.GetHostId().c_str(), url_dst.GetLocation().c_str(),
log_id.c_str(), prepare_st.ToStr().c_str());
if (prepare_st.IsOK()) {
XrdCl::XRootDStatus tpc_st = cpy.Run(0);
if (!tpc_st.IsOK()) {
eos_err("%s", SSTR("src=" << url_src.GetLocation().c_str() <<
" dst=" << url_dst.GetLocation().c_str() <<
" logid=" << log_id <<
" tpc_err=" << tpc_st.ToStr()).c_str());
} else {
gOFS->MgmStats.Add("DrainCentralSuccessful", 0, 0, 1);
eos_info("msg=\"drain successful\" logid=%s", log_id.c_str());
mStatus = Status::OK;
return;
}
} else {
gOFS->MgmStats.Add("DrainCentralSuccessful", 0, 0, 1);
eos_info("msg=\"drain successful\" logid=%s", log_id.c_str());
mStatus = Status::OK;
return;
eos_err("%s", SSTR("msg=\"prepare drain failed\" logid="
<< log_id.c_str()).c_str());
}
} else {
eos_err("%s", SSTR("msg=\"prepare drain failed\" logid="
<< log_id.c_str()).c_str());
}
gOFS->MgmStats.Add("DrainCentralFailed", 0, 0, 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment