Commit 73bc0b60 authored by Jozsef Makai's avatar Jozsef Makai

MGM,FST: implemented FST-CTA closew communication, redirection URL left

parent b14eb4cf
......@@ -50,7 +50,9 @@ include_directories(
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/
${FOLLY_INCLUDE_DIRS}
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/qclient/include
${CMAKE_BINARY_DIR}/namespace/ns_quarkdb) # for the generated protobuf
${CMAKE_BINARY_DIR}/namespace/ns_quarkdb
${CMAKE_SOURCE_DIR}/common/xrootd-ssi-protobuf-interface/include
${CMAKE_BINARY_DIR}/common/eos_cta_pb) # for the generated protobuf
#-------------------------------------------------------------------------------
# Add tests if not on MacOSX
......
......@@ -47,8 +47,8 @@ public:
XrdOucString FstHostPort; // <host>:<port>
XrdOucString Manager; // <host>:<port>
XrdOucString KernelVersion; // kernel version of the host
XrdOucString FstProtoWFEndpoint; // proto wf endpoint (typically CTA frontend)
XrdOucString FstProtoWFResource; // proto wf resource (typically CTA frontend)
std::string ProtoWFEndpoint; // proto wf endpoint (typically CTA frontend)
std::string ProtoWFResource; // proto wf resource (typically CTA frontend)
int PublishInterval; // Interval after which filesystem information should be published
XrdOucString StartDate; // Time when daemon was started
XrdOucString KeyTabAdler; // adler string of the keytab file
......
......@@ -36,6 +36,10 @@
#include "common/Statfs.hh"
#include "common/SyncAll.hh"
#include "common/StackTrace.hh"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
#include "common/eos_cta_pb/EosCtaAlertHandler.hh"
#include "common/Constants.hh"
#include "common/StringConversion.hh"
#include "XrdNet/XrdNetOpts.hh"
#include "XrdOfs/XrdOfs.hh"
#include "XrdOfs/XrdOfsTrace.hh"
......@@ -457,13 +461,13 @@ XrdFstOfs::Configure(XrdSysError& Eroute, XrdOucEnv* envP)
if (!strcmp("protowfendpoint", var)) {
if ((val = Config.GetWord())) {
eos::fst::Config::gConfig.FstProtoWFEndpoint = val;
eos::fst::Config::gConfig.ProtoWFEndpoint = val;
}
}
if (!strcmp("protowfresource", var)) {
if ((val = Config.GetWord())) {
eos::fst::Config::gConfig.FstProtoWFResource = val;
eos::fst::Config::gConfig.ProtoWFResource = val;
}
}
......@@ -1566,8 +1570,118 @@ XrdFstOfs::WaitForOngoingIO(std::chrono::seconds timeout)
int
XrdFstOfs::CallSynchronousClosew(const Fmd& fmd, const string& ownerName,
const string& ownerGroupName, const string& instanceName) {
return 0;
const string& ownerGroupName, const string& instanceName,
const string& fullPath, const std::map<std::string, std::string>& xattrs) {
cta::xrd::Request request;
auto notification = request.mutable_notification();
notification->mutable_file()->mutable_owner()->set_username(ownerName);
notification->mutable_file()->mutable_owner()->set_groupname(ownerGroupName);
notification->mutable_file()->set_size(fmd.size());
notification->mutable_file()->mutable_cks()->set_type(
eos::common::LayoutId::GetChecksumString(fmd.lid()));
notification->mutable_file()->mutable_cks()->set_value(fmd.checksum());
notification->mutable_wf()->set_event(cta::eos::Workflow::CLOSEW);
notification->mutable_wf()->mutable_instance()->set_name(instanceName);
notification->mutable_file()->set_lpath(fullPath);
notification->mutable_file()->set_fid(fmd.fid());
std::string managerName;
{
XrdSysMutexHelper lock(Config::gConfig.Mutex);
managerName = Config::gConfig.Manager.c_str();
}
auto fxidString = eos::common::StringConversion::FastUnsignedToAsciiHex(fmd.fid());
std::ostringstream srcStream;
srcStream << "root://" << managerName << "/" << fullPath << "?eos.lfn=fxid:"
<< fxidString;
notification->mutable_wf()->mutable_instance()->set_url(srcStream.str());
std::ostringstream reportStream;
reportStream << "eosQuery://" << managerName
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=archived&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0";
notification->mutable_transport()->set_report_url(reportStream.str());
std::ostringstream errorReportStream;
errorReportStream << "eosQuery://" << managerName
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=" << ARCHIVE_FAILED_WORKFLOW_NAME << "&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0&mgm.errmsg=";
notification->mutable_transport()->set_error_report_url(errorReportStream.str());
for (const auto& attrPair : xattrs)
{
google::protobuf::MapPair<std::string, std::string> attr(attrPair.first,
attrPair.second);
notification->mutable_file()->mutable_xattr()->insert(attr);
}
// Communication with service
std::string endPoint;
std::string resource;
{
XrdSysMutexHelper lock(Config::gConfig.Mutex);
endPoint = Config::gConfig.ProtoWFEndpoint;
resource = Config::gConfig.ProtoWFResource;
}
if (endPoint.empty() || resource.empty()) {
eos_static_err(
"You are running proto wf jobs without specifying fstofs.protowfendpoint or fstofs.protowfresource in the FST config file."
);
return ENOTCONN;
}
XrdSsiPb::Config config;
if(getenv("XRDDEBUG")) {
config.set("log", "all");
} else {
config.set("log", "info");
}
config.set("request_timeout", "120");
// Instantiate service object only once, static is also thread-safe
static XrdSsiPbServiceType service(endPoint, resource, config);
cta::xrd::Response response;
try {
auto sentAt = std::chrono::steady_clock::now();
auto future = service.Send(request, response);
future.get();
auto receivedAt = std::chrono::steady_clock::now();
auto timeSpent = std::chrono::duration_cast<std::chrono::milliseconds>(receivedAt - sentAt);
eos_static_info("SSI Protobuf time for sync::closew = %ld", timeSpent.count());
} catch (std::runtime_error& error) {
eos_static_err("Could not send request to outside service. Reason: %s",
error.what());
return ENOTCONN;
}
static std::map<decltype(cta::xrd::Response::RSP_ERR_CTA), const char*> errorEnumMap;
errorEnumMap[cta::xrd::Response::RSP_ERR_CTA] = "RSP_ERR_CTA";
errorEnumMap[cta::xrd::Response::RSP_ERR_USER] = "RSP_ERR_USER";
errorEnumMap[cta::xrd::Response::RSP_ERR_PROTOBUF] = "RSP_ERR_PROTOBUF";
errorEnumMap[cta::xrd::Response::RSP_INVALID] = "RSP_INVALID";
switch (response.type()) {
case cta::xrd::Response::RSP_SUCCESS: return SFS_OK;
case cta::xrd::Response::RSP_ERR_CTA:
case cta::xrd::Response::RSP_ERR_USER:
case cta::xrd::Response::RSP_ERR_PROTOBUF:
case cta::xrd::Response::RSP_INVALID:
eos_static_err("%s for file %s. Reason: %s", errorEnumMap[response.type()], fullPath.c_str(), response.message_txt().c_str());
return EPROTO;
default:
eos_static_err("Response:\n%s", response.DebugString().c_str());
return EPROTO;
}
}
EOSFSTNAMESPACE_END
......@@ -307,7 +307,8 @@ public:
bool retry = true);
int CallSynchronousClosew(const Fmd& fmd, const string& ownerName,
const string& ownerGroupName, const string& instanceName);
const string& ownerGroupName, const string& instanceName,
const string& fullPath, const std::map<std::string, std::string>& xattrs);
//----------------------------------------------------------------------------
//! Function dealing with plugin calls
......
......@@ -26,7 +26,7 @@
#include "common/LayoutId.hh"
#include "common/ShellCmd.hh"
#include "common/StringTokenizer.hh"
#include "mgm/Constants.hh"
#include "common/Constants.hh"
#include "mgm/Quota.hh"
#include "common/eos_cta_pb/EosCtaAlertHandler.hh"
#include "mgm/WFE.hh"
......@@ -2181,7 +2181,7 @@ WFE::Job::SendProtoWFRequest(Job* jobPtr, const std::string& fullPath,
auto receivedAt = std::chrono::steady_clock::now();
auto timeSpent = std::chrono::duration_cast<std::chrono::milliseconds>(receivedAt - sentAt);
eos_static_info("SSI Protobuf time for %s = %ld", jobPtr->mActions[0].mEvent.c_str(),timeSpent.count());
eos_static_info("SSI Protobuf time for %s = %ld", jobPtr->mActions[0].mEvent.c_str(), timeSpent.count());
} catch (std::runtime_error& error) {
eos_static_err("Could not send request to outside service. Reason: %s",
error.what());
......@@ -2190,7 +2190,7 @@ WFE::Job::SendProtoWFRequest(Job* jobPtr, const std::string& fullPath,
return ENOTCONN;
}
std::map<decltype(cta::xrd::Response::RSP_ERR_CTA), const char*> errorEnumMap;
static std::map<decltype(cta::xrd::Response::RSP_ERR_CTA), const char*> errorEnumMap;
errorEnumMap[cta::xrd::Response::RSP_ERR_CTA] = "RSP_ERR_CTA";
errorEnumMap[cta::xrd::Response::RSP_ERR_USER] = "RSP_ERR_USER";
errorEnumMap[cta::xrd::Response::RSP_ERR_PROTOBUF] = "RSP_ERR_PROTOBUF";
......@@ -2288,6 +2288,30 @@ WFE::Job::MoveWithResults(int rcode, std::string fromQueue) {
}
}
std::string
WFE::GetGroupName(gid_t gid) {
int errc = 0;
auto group_name = Mapping::GidToGroupName(gid, errc);
if (errc) {
group_name = "nobody";
}
return group_name;
}
std::string
WFE::GetUserName(uid_t uid) {
int errc = 0;
auto user_name = Mapping::UidToUserName(uid, errc);
if (errc) {
user_name = "nobody";
}
return user_name;
}
/*----------------------------------------------------------------------------*/
void
WFE::PublishActiveJobs()
......
......@@ -305,6 +305,10 @@ public:
return mActiveJobs.load();
}
static std::string GetUserName(uid_t uid);
static std::string GetGroupName(gid_t gid);
/// the scheduler class is providing a destructor-less object,
/// so we have to create once a singleton of this and keep/share it
static XrdSysMutex gSchedulerMutex;
......
......@@ -28,7 +28,7 @@
#include "mgm/XrdMgmOfs.hh"
#include "mgm/WFE.hh"
#include "mgm/FsView.hh"
#include "mgm/Constants.hh"
#include "common/Constants.hh"
/*----------------------------------------------------------------------------*/
EOSMGMNAMESPACE_BEGIN
......@@ -121,6 +121,10 @@ Workflow::getCGICloseW(std::string workflow)
if (mAttr && (*mAttr).count(syncKey)) {
cgi = "&mgm.event=sync::closew&mgm.workflow=";
cgi += workflow;
cgi += "&mgm.instance=";
cgi += gOFS->MgmOfsInstanceName.c_str();
cgi += "&mgm.owner=";
cgi += "&mgm.ownergroup=";
} else if (mAttr && (*mAttr).count(key)) {
cgi = "&mgm.event=closew&mgm.workflow=";
cgi += workflow;
......
......@@ -21,7 +21,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "Constants.hh"
#include "common/Constants.hh"
#include "common/Mapping.hh"
#include "common/FileId.hh"
#include "common/LayoutId.hh"
......
......@@ -49,7 +49,7 @@
#include "XrdOss/XrdOss.hh"
#include "XrdSec/XrdSecInterface.hh"
#include "XrdSfs/XrdSfsAio.hh"
#include "Constants.hh"
#include "common/Constants.hh"
#ifdef __APPLE__
#define ECOMM 70
......
......@@ -26,7 +26,7 @@
#include "StagerRmCmd.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/Acl.hh"
#include "mgm/Constants.hh"
#include "common/Constants.hh"
#include "namespace/interface/IView.hh"
EOSMGMNAMESPACE_BEGIN
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment