Commit 80350f64 by Andrea Manzi

Merge branch 'master' of https://gitlab.cern.ch:8443/dss/eos

parents 4e3d887a 513f9389
Pipeline #388670 failed with stages
in 60 minutes 4 seconds
......@@ -14,3 +14,6 @@
path = common/backward-cpp
url = https://github.com/bombela/backward-cpp.git
branch = master
[submodule "common/xrootd-ssi-protobuf-interface"]
path = common/xrootd-ssi-protobuf-interface
url = https://:@gitlab.cern.ch:8443/eos/xrootd-ssi-protobuf-interface.git
......@@ -35,6 +35,12 @@ include_directories(
${OPENSSL_INCLUDE_DIRS})
#-------------------------------------------------------------------------------
# CTA integration related operations
#-------------------------------------------------------------------------------
set(EOS_CTA_PB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/xrootd-ssi-protobuf-interface/eos_cta/protobuf/)
add_subdirectory(eos_cta_pb)
#-------------------------------------------------------------------------------
# Generate protocol buffer files
#-------------------------------------------------------------------------------
PROTOBUF_GENERATE_CPP(DBMAPTEST_SRCS DBMAPTEST_HDRS dbmaptest/test.proto)
......
......@@ -24,15 +24,18 @@
#pragma once
#include "mgm/Namespace.hh"
#include "mgm/XrdMgmOfs.hh"
#include "namespace/interface/IView.hh"
#include "common/Namespace.hh"
EOSMGMNAMESPACE_BEGIN
EOSCOMMONNAMESPACE_BEGIN
static constexpr decltype(gOFS->eosFileService->getFileMD(0)->getLocation(0)) TAPE_FS_ID = 65535;
static constexpr auto TAPE_FS_ID = 65535u;
static constexpr auto RETRIEVES_ATTR_NAME = "sys.retrieves";
static constexpr auto RETRIEVES_ERROR_ATTR_NAME = "sys.retrieves.error";
static constexpr auto RETRIEVES_ERROR_ATTR_NAME = "sys.retrieve.error";
static constexpr auto ARCHIVE_ERROR_ATTR_NAME = "sys.archive.error";
static constexpr auto RETRIEVE_WRITTEN_WORKFLOW_NAME = "retrieve_written";
static constexpr auto RETRIEVE_FAILED_WORKFLOW_NAME = "retrieve_failed";
static constexpr auto ARCHIVE_FAILED_WORKFLOW_NAME = "archive_failed";
static constexpr auto WF_CUSTOM_ATTRIBUTES_TO_FST_EQUALS = "=";
static constexpr auto WF_CUSTOM_ATTRIBUTES_TO_FST_SEPARATOR = ";;;";
EOSMGMNAMESPACE_END
\ No newline at end of file
EOSCOMMONNAMESPACE_END
\ No newline at end of file
......@@ -472,7 +472,7 @@ bool
SymKey::ProtobufBase64Encode(const google::protobuf::Message* msg,
std::string& output)
{
size_t sz = msg->ByteSize();
auto sz = msg->ByteSize();
std::string buffer(sz , '\0');
google::protobuf::io::ArrayOutputStream aos((void*)buffer.data(), sz);
......@@ -480,11 +480,7 @@ SymKey::ProtobufBase64Encode(const google::protobuf::Message* msg,
return false;
}
if (!eos::common::SymKey::Base64Encode(buffer.data(), buffer.size(), output)) {
return false;
}
return true;
return eos::common::SymKey::Base64Encode(buffer.data(), buffer.size(), output);
}
//------------------------------------------------------------------------------
......
......@@ -39,7 +39,6 @@ set_source_files_properties(
PROPERTIES GENERATED 1)
include_directories(${PROTOBUF3_INCLUDE_DIRS})
add_library (XrdSsiPbEosCta ${EOS_CTA_PB_SRCS})
set_target_properties(XrdSsiPbEosCta PROPERTIES LINKER_LANGUAGE CXX)
set_target_properties(XrdSsiPbEosCta PROPERTIES POSITION_INDEPENDENT_CODE TRUE)
target_link_libraries(XrdSsiPbEosCta ${PROTOBUF3_LIBRARIES})
add_library (XrdSsiPbEosCta-Objects OBJECT
${EOS_CTA_PB_SRCS})
set_target_properties(XrdSsiPbEosCta-Objects PROPERTIES POSITION_INDEPENDENT_CODE TRUE)
......@@ -25,7 +25,7 @@
#pragma once
#include "common/Logging.hh"
#include "mgm/cta_interface/eos_cta/include/CtaFrontendApi.hpp"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
// Define XRootD SSI Alert message callback
......
Subproject commit 3ce8079aec20aa77b3a4773e6caa02b36bb0a749
......@@ -16,13 +16,14 @@ The workflow engine allows to create chained workflows e.g. one workflow can tri
Event Description
=================== ==================================================================================================
sync::create event is triggered at the MGM when a file is being created (synchronous event)
sync::openw event is triggered at the MGM when a 'file open for write' (synchronous event)
open event is triggered at the MGM when a 'file open'
- if the return of an open call is ENONET a workflow defined stall time is returned
sync::prepare event is triggered at the MGM when a 'prepare' is issued (synchronous event)
sync::abort_prepare event is triggered at the MGM when xrdfs prepare -f issued (synchronous event)
retrieve_failed event is triggered at the MGM when a retrieval of a file has failed
retrieve_failed event is triggered with an error message at the MGM when the retrieval of a file has failed
archive_failed event is triggered with an error message at the MGM when the archival of a file has failed
closer event is triggered via the MGM when a read-open file is closed on an FST.
sync::closew event is triggered via the FST when a write-open file is closed (it has priority over the asynchronous one)
closew event is triggered via the MGM when a write-open file is closed on an FST
sync::delete event is triggered at the MGM when a file has been deleted (synchronous event)
============== ==================================================================================================
......
......@@ -50,7 +50,9 @@ include_directories(
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/
${FOLLY_INCLUDE_DIRS}
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/qclient/include
${CMAKE_BINARY_DIR}/namespace/ns_quarkdb) # for the generated protobuf
${CMAKE_BINARY_DIR}/namespace/ns_quarkdb
${CMAKE_SOURCE_DIR}/common/xrootd-ssi-protobuf-interface/include
${CMAKE_BINARY_DIR}/common/eos_cta_pb) # for the generated protobuf
#-------------------------------------------------------------------------------
# Add tests if not on MacOSX
......@@ -347,10 +349,12 @@ target_compile_definitions(XrdEosFst-Objects PUBLIC
-DDAEMONUID=${DAEMONUID} -DDAEMONGID=${DAEMONGID})
add_library(XrdEosFst MODULE
$<TARGET_OBJECTS:XrdEosFst-Objects>)
$<TARGET_OBJECTS:XrdEosFst-Objects>
$<TARGET_OBJECTS:XrdSsiPbEosCta-Objects>)
add_library(XrdEosFst-Shared SHARED
$<TARGET_OBJECTS:XrdEosFst-Objects>)
$<TARGET_OBJECTS:XrdEosFst-Objects>
$<TARGET_OBJECTS:XrdSsiPbEosCta-Objects>)
target_compile_definitions(XrdEosFst PUBLIC
-DDAEMONUID=${DAEMONUID} -DDAEMONGID=${DAEMONGID})
......@@ -359,6 +363,7 @@ target_compile_definitions(XrdEosFst-Shared PUBLIC
-DDAEMONUID=${DAEMONUID} -DDAEMONGID=${DAEMONGID})
target_link_libraries(XrdEosFst PRIVATE
eosCommon
eosCommonServer
EosFstIo-Static
eosCapability-Static
......@@ -372,10 +377,12 @@ target_link_libraries(XrdEosFst PRIVATE
${NCURSES_LIBRARIES}
${XROOTD_CL_LIBRARY}
${XOORTD_UTILS_LIBRARY}
${XROOTD_SSI_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(XrdEosFst-Shared PUBLIC
eosCommon
eosCommonServer
EosFstIo-Static
eosCapability-Static
......@@ -390,6 +397,7 @@ target_link_libraries(XrdEosFst-Shared PUBLIC
${XROOTD_CL_LIBRARY}
${XOORTD_UTILS_LIBRARY}
${XROOTD_SERVER_LIBRARY}
${XROOTD_SSI_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${CMAKE_THREAD_LIBS_INIT})
......
......@@ -47,6 +47,8 @@ public:
XrdOucString FstHostPort; // <host>:<port>
XrdOucString Manager; // <host>:<port>
XrdOucString KernelVersion; // kernel version of the host
std::string ProtoWFEndpoint; // proto wf endpoint (typically CTA frontend)
std::string ProtoWFResource; // proto wf resource (typically CTA frontend)
int PublishInterval; // Interval after which filesystem information should be published
XrdOucString StartDate; // Time when daemon was started
XrdOucString KeyTabAdler; // adler string of the keytab file
......
......@@ -36,6 +36,10 @@
#include "common/Statfs.hh"
#include "common/SyncAll.hh"
#include "common/StackTrace.hh"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
#include "common/eos_cta_pb/EosCtaAlertHandler.hh"
#include "common/Constants.hh"
#include "common/StringConversion.hh"
#include "XrdNet/XrdNetOpts.hh"
#include "XrdOfs/XrdOfs.hh"
#include "XrdOfs/XrdOfsTrace.hh"
......@@ -339,6 +343,8 @@ XrdFstOfs::Configure(XrdSysError& Eroute, XrdOucEnv* envP)
int cfgFD;
int NoGo = 0;
eos::common::StringConversion::InitLookupTables();
if (XrdOfs::Configure(Eroute, envP)) {
Eroute.Emsg("Config", "default OFS configuration failed");
return SFS_ERROR;
......@@ -455,6 +461,18 @@ XrdFstOfs::Configure(XrdSysError& Eroute, XrdOucEnv* envP)
}
}
if (!strcmp("protowfendpoint", var)) {
if ((val = Config.GetWord())) {
eos::fst::Config::gConfig.ProtoWFEndpoint = val;
}
}
if (!strcmp("protowfresource", var)) {
if ((val = Config.GetWord())) {
eos::fst::Config::gConfig.ProtoWFResource = val;
}
}
if (!strcmp("qdbcluster", var)) {
std::string qdb_cluster;
......@@ -784,7 +802,8 @@ XrdFstOfs::stat(const char* path,
int
XrdFstOfs::CallManager(XrdOucErrInfo* error, const char* path,
const char* manager, XrdOucString& capOpaqueFile,
XrdOucString* return_result, unsigned short timeout)
XrdOucString* return_result, unsigned short timeout,
bool linkPerThread, bool retry)
{
EPNAME("CallManager");
int rc = SFS_OK;
......@@ -793,6 +812,14 @@ XrdFstOfs::CallManager(XrdOucErrInfo* error, const char* path,
XrdCl::Buffer* response = 0;
XrdCl::XRootDStatus status;
XrdOucString address = "root://";
if (linkPerThread) {
std::ostringstream tidStr;
tidStr << std::this_thread::get_id();
address += tidStr.str().c_str();
address += "@";
}
XrdOucString lManager;
size_t tried = 0;
......@@ -861,13 +888,26 @@ again:
rc = -EADV;
}
if (msg.find("[EAGAIN]") != STR_NPOS) {
rc = -EAGAIN;
}
if (msg.find("[ENOTCONN]") != STR_NPOS) {
rc = -ENOTCONN;
}
if (msg.find("[EPROTO]") != STR_NPOS) {
rc = -EPROTO;
}
if (rc != SFS_ERROR) {
gOFS.Emsg(epname, *error, -rc, msg.c_str(), path);
return gOFS.Emsg(epname, *error, -rc, msg.c_str(), path);
} else {
eos_static_err("msg=\"query error\" status=%d code=%d", status.status,
status.code);
if ((status.code >= 100) && (status.code <= 300) && (!timeout)) {
if (retry && (status.code >= 100) && (status.code <= 300) && (!timeout)) {
// implement automatic retry - network errors will be cured at some point
delete fs;
XrdSysTimer sleeper;
......@@ -889,7 +929,7 @@ again:
goto again;
}
gOFS.Emsg(epname, *error, ECOMM, msg.c_str(), path);
return gOFS.Emsg(epname, *error, ECOMM, msg.c_str(), path);
}
}
......@@ -898,10 +938,7 @@ again:
}
delete fs;
if (response) {
delete response;
}
delete response;
return rc;
}
......@@ -1533,4 +1570,125 @@ XrdFstOfs::WaitForOngoingIO(std::chrono::seconds timeout)
return all_done;
}
int
XrdFstOfs::CallSynchronousClosew(const Fmd& fmd, const string& ownerName,
const string& ownerGroupName, const string& requestorName,
const string& requestorGroupName, const string& instanceName,
const string& fullPath, const std::map<std::string, std::string>& xattrs) {
using namespace eos::common;
cta::xrd::Request request;
auto notification = request.mutable_notification();
notification->mutable_cli()->mutable_user()->set_username(requestorName);
notification->mutable_cli()->mutable_user()->set_groupname(requestorGroupName);
notification->mutable_file()->mutable_owner()->set_username(ownerName);
notification->mutable_file()->mutable_owner()->set_groupname(ownerGroupName);
notification->mutable_file()->set_size(fmd.size());
notification->mutable_file()->mutable_cks()->set_type(
eos::common::LayoutId::GetChecksumString(fmd.lid()));
notification->mutable_file()->mutable_cks()->set_value(fmd.checksum());
notification->mutable_wf()->set_event(cta::eos::Workflow::CLOSEW);
notification->mutable_wf()->mutable_instance()->set_name(instanceName);
notification->mutable_file()->set_lpath(fullPath);
notification->mutable_file()->set_fid(fmd.fid());
std::string managerName;
{
XrdSysMutexHelper lock(Config::gConfig.Mutex);
managerName = Config::gConfig.Manager.c_str();
}
auto fxidString = eos::common::StringConversion::FastUnsignedToAsciiHex(fmd.fid());
std::ostringstream srcStream;
srcStream << "root://" << managerName << "/" << fullPath << "?eos.lfn=fxid:"
<< fxidString;
notification->mutable_wf()->mutable_instance()->set_url(srcStream.str());
std::ostringstream reportStream;
reportStream << "eosQuery://" << managerName
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=archived&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0";
notification->mutable_transport()->set_report_url(reportStream.str());
std::ostringstream errorReportStream;
errorReportStream << "eosQuery://" << managerName
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=" << ARCHIVE_FAILED_WORKFLOW_NAME << "&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0&mgm.errmsg=";
notification->mutable_transport()->set_error_report_url(errorReportStream.str());
for (const auto& attrPair : xattrs)
{
google::protobuf::MapPair<std::string, std::string> attr(attrPair.first,
attrPair.second);
notification->mutable_file()->mutable_xattr()->insert(attr);
}
// Communication with service
std::string endPoint;
std::string resource;
{
XrdSysMutexHelper lock(Config::gConfig.Mutex);
endPoint = Config::gConfig.ProtoWFEndpoint;
resource = Config::gConfig.ProtoWFResource;
}
if (endPoint.empty() || resource.empty()) {
eos_static_err(
"You are running proto wf jobs without specifying fstofs.protowfendpoint or fstofs.protowfresource in the FST config file."
);
return ENOTCONN;
}
XrdSsiPb::Config config;
if(getenv("XRDDEBUG")) {
config.set("log", "all");
} else {
config.set("log", "info");
}
config.set("request_timeout", "120");
// Instantiate service object only once, static is also thread-safe
static XrdSsiPbServiceType service(endPoint, resource, config);
cta::xrd::Response response;
try {
auto sentAt = std::chrono::steady_clock::now();
auto future = service.Send(request, response);
future.get();
auto receivedAt = std::chrono::steady_clock::now();
auto timeSpent = std::chrono::duration_cast<std::chrono::milliseconds>(receivedAt - sentAt);
eos_static_info("SSI Protobuf time for sync::closew=%ld", timeSpent.count());
} catch (std::runtime_error& error) {
eos_static_err("Could not send request to outside service. Reason: %s",
error.what());
return ENOTCONN;
}
static std::map<decltype(cta::xrd::Response::RSP_ERR_CTA), const char*> errorEnumMap;
errorEnumMap[cta::xrd::Response::RSP_ERR_CTA] = "RSP_ERR_CTA";
errorEnumMap[cta::xrd::Response::RSP_ERR_USER] = "RSP_ERR_USER";
errorEnumMap[cta::xrd::Response::RSP_ERR_PROTOBUF] = "RSP_ERR_PROTOBUF";
errorEnumMap[cta::xrd::Response::RSP_INVALID] = "RSP_INVALID";
switch (response.type()) {
case cta::xrd::Response::RSP_SUCCESS: return SFS_OK;
case cta::xrd::Response::RSP_ERR_CTA:
case cta::xrd::Response::RSP_ERR_USER:
case cta::xrd::Response::RSP_ERR_PROTOBUF:
case cta::xrd::Response::RSP_INVALID:
eos_static_err("%s for file %s. Reason: %s", errorEnumMap[response.type()], fullPath.c_str(), response.message_txt().c_str());
return EPROTO;
default:
eos_static_err("Response:\n%s", response.DebugString().c_str());
return EPROTO;
}
}
EOSFSTNAMESPACE_END
......@@ -302,7 +302,14 @@ public:
const char* manager,
XrdOucString& capOpaqueFile,
XrdOucString* return_result = 0,
unsigned short timeout = 0);
unsigned short timeout = 0,
bool linkPerThread = false,
bool retry = true);
int CallSynchronousClosew(const Fmd& fmd, const string& ownerName,
const string& ownerGroupName, const string& requestorName,
const string& requestorGroupName, const string& instanceName,
const string& fullPath, const std::map<std::string, std::string>& xattrs);
//----------------------------------------------------------------------------
//! Function dealing with plugin calls
......@@ -322,7 +329,7 @@ public:
//----------------------------------------------------------------------------
//! Allows to switch on error simulation in the OFS stack
//!
//! @param tag type of simulation eroor
//! @param tag type of simulation error
//----------------------------------------------------------------------------
void SetSimulationError(const char* tag);
......
......@@ -22,6 +22,7 @@
************************************************************************/
#define __STDC_FORMAT_MACROS
#include <cinttypes>
#include "common/Constants.hh"
#include "common/Path.hh"
#include "common/http/OwnCloud.hh"
#include "common/StringTokenizer.hh"
......@@ -57,7 +58,7 @@ XrdFstOfsFile::XrdFstOfsFile(const char* user, int MonID) :
hasReadError(false), isRW(false), mIsTpcDst(false), mIsDevNull(false),
isCreation(false), isReplication(false), mIsInjection(false),
mRainReconstruct(false), deleteOnClose(false), repairOnClose(false),
commitReconstruction(false), mEventOnClose(false), mEventWorkflow(""),
commitReconstruction(false), mEventOnClose(false), mEventWorkflow(""), mSyncEventOnClose(false),
mIsOCchunk(false), writeErrorFlag(false), mTpcFlag(kTpcNone),
fMd(nullptr), mCheckSum(nullptr), layOut(nullptr), maxOffsetWritten(0),
openSize(0), closeSize(0),
......@@ -185,8 +186,10 @@ XrdFstOfsFile::open(const char* path, XrdSfsFileOpenMode open_mode,
eos::common::StringConversion::MaskTag(maskOpaque, "cap.sym");
eos::common::StringConversion::MaskTag(maskOpaque, "cap.msg");
eos::common::StringConversion::MaskTag(maskOpaque, "authz");
eos_info("path=%s info=%s isRW=%d open_mode=%x", mNsPath.c_str(),
maskOpaque.c_str(), isRW, open_mode);
// Process and filter open opaque information
std::string out_opaque;
std::string in_opaque = (opaque ? opaque : "");
......@@ -1894,7 +1897,7 @@ XrdFstOfsFile::close()
}
}
if (!rc && mEventOnClose && layOut->IsEntryServer()) {
if (!rc && (mEventOnClose || mSyncEventOnClose) && layOut->IsEntryServer()) {
//trigger an MGM event if asked from the entry point
XrdOucString capOpaqueFile = "";
XrdOucString eventType = "";
......@@ -1903,14 +1906,36 @@ XrdFstOfsFile::close()
capOpaqueFile += mCapOpaque->Env(envlen);
capOpaqueFile += "&mgm.pcmd=event";
// Set default workflow if nothing is specified
if (mEventWorkflow.length() == 0) {
mEventWorkflow = "default";
}
if (isRW) {
capOpaqueFile += "&mgm.event=closew";
eventType = "closew";
eventType = mSyncEventOnClose ? "sync::closew" : "closew";
} else {
capOpaqueFile += "&mgm.event=closer";
eventType = "closer";
}
if (mSyncEventOnClose) {
std::string decodedAttributes;
eos::common::SymKey::Base64Decode(mEventAttributes.c_str(), decodedAttributes);
std::map<std::string, std::string> attributes;
eos::common::StringConversion::GetKeyValueMap(decodedAttributes.c_str(), attributes,
eos::common::WF_CUSTOM_ATTRIBUTES_TO_FST_EQUALS,
eos::common::WF_CUSTOM_ATTRIBUTES_TO_FST_SEPARATOR, nullptr);
rc = gOFS.CallSynchronousClosew(fMd->mProtoFmd, mEventOwner, mEventOwnerGroup, mEventRequestor, mEventRequestorGroup,
mEventInstance, mCapOpaque->Get("mgm.path"), attributes);
if (rc == SFS_OK) {
return rc;
}
}
capOpaqueFile += "&mgm.event=";
capOpaqueFile += eventType;
// The log ID to the commit
capOpaqueFile += "&mgm.logid=";
capOpaqueFile += logId;
......@@ -1929,7 +1954,7 @@ XrdFstOfsFile::close()
eos_info("msg=\"notify\" event=\"%s\" workflow=\"%s\"", eventType.c_str(),
mEventWorkflow.c_str());
rc = gOFS.CallManager(&error, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueFile);
mCapOpaque->Get("mgm.manager"), capOpaqueFile, nullptr, 30, mSyncEventOnClose, false);
}
eos_info("Return code rc=%i.", rc);
......@@ -2489,10 +2514,10 @@ XrdFstOfsFile::DoTpcTransfer()
src_cgi += gOFS.TpcMap[mIsTpcDst][mTpcKey.c_str()].org;
}
XrdIo tpcIO(src_url.c_str());
XrdIo tpcIO(src_url);
eos_info("sync-url=%s sync-cgi=%s", src_url.c_str(), src_cgi.c_str());
if (tpcIO.fileOpen(0, 0, src_cgi.c_str())) {
if (tpcIO.fileOpen(0, 0, src_cgi)) {
eos_err("msg=\"TPC open failed for url=%s cgi=%s\"", src_url.c_str(),
src_cgi.c_str());
XrdSysMutexHelper scope_lock(mTpcJobMutex);
......@@ -2876,12 +2901,32 @@ XrdFstOfsFile::ProcessOpenOpaque(const std::string& in_opaque,
if ((val = env.Get("mgm.event"))) {
std::string event = val;
if (event == "close") {
if (event == "closew") {
mEventOnClose = true;
} else if (event == "sync::closew") {
mSyncEventOnClose = true;
}
val = env.Get("mgm.workflow");
mEventWorkflow = (val ? val : "");
val = env.Get("mgm.instance");
mEventInstance = val ? val : "";
val = env.Get("mgm.owner");
mEventOwner = val ? val : "";
val = env.Get("mgm.ownergroup");
mEventOwnerGroup = val ? val : "";
val = env.Get("mgm.requestor");
mEventRequestor = val ? val : "";
val = env.Get("mgm.requestorgroup");
mEventRequestorGroup = val ? val : "";
val = env.Get("mgm.attributes");
mEventAttributes = val ? val : "";
}
if ((val = env.Get("eos.injection"))) {
......
......@@ -398,6 +398,13 @@ protected:
bool mEventOnClose; ///< Indicator to send a specified event to MGM on close
//! Indicates the workflow to be triggered by an event
XrdOucString mEventWorkflow;
bool mSyncEventOnClose; //! indicator to send a specified event to the mgm on close
std::string mEventInstance;
std::string mEventOwner;
std::string mEventOwnerGroup;
std::string mEventRequestor;
std::string mEventRequestorGroup;
std::string mEventAttributes;
enum {
kOfsIoError = 1, //! generic IO error
......
......@@ -38,14 +38,8 @@ include_directories(
${CMAKE_BINARY_DIR}/auth_plugin/
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/qclient/include
${CMAKE_SOURCE_DIR}/namespace/ns_quarkdb/qclient/src
${CMAKE_SOURCE_DIR}/mgm/cta_interface/include
${CMAKE_BINARY_DIR}/mgm/eos_cta_pb)
#-------------------------------------------------------------------------------
# CTA integration related operations
#-------------------------------------------------------------------------------
set(EOS_CTA_PB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/cta_interface/eos_cta/protobuf/)
add_subdirectory(eos_cta_pb)
${CMAKE_SOURCE_DIR}/common/xrootd-ssi-protobuf-interface/include
${CMAKE_BINARY_DIR}/common/eos_cta_pb)
#-------------------------------------------------------------------------------
# Generate protocol buffer files
......@@ -192,7 +186,8 @@ set_target_properties(
add_library(XrdEosMgm MODULE
$<TARGET_OBJECTS:EosAuthProto-Objects>
$<TARGET_OBJECTS:XrdEosMgm-Objects>)
$<TARGET_OBJECTS:XrdEosMgm-Objects>
$<TARGET_OBJECTS:XrdSsiPbEosCta-Objects>)
target_compile_definitions(
XrdEosMgm PUBLIC
......@@ -214,7 +209,6 @@ target_link_libraries(
# libEosNsCommon and don't link with the whole EosNsQuarkdb libary
EosNsQuarkdb-Shared
eosCapability-Static
XrdSsiPbEosCta
${FOLLY_LIBRARIES}
${Z_LIBRARY}
${ZMQ_LIBRARIES}
......@@ -266,7 +260,8 @@ target_link_libraries(
if(Linux)
add_library(XrdEosMgm-Shared SHARED
$<TARGET_OBJECTS:EosAuthProto-Objects>
$<TARGET_OBJECTS:XrdEosMgm-Objects>)
$<TARGET_OBJECTS:XrdEosMgm-Objects>
$<TARGET_OBJECTS:XrdSsiPbEosCta-Objects>)
target_compile_definitions(
XrdEosMgm-Shared PUBLIC
......@@ -280,7 +275,6 @@ if(Linux)
EosNsQuarkdb-Shared
eosCommonServer
eosCapability-Static
XrdSsiPbEosCta
${FOLLY_LIBRARIES}
${Z_LIBRARY}
${ZMQ_LIBRARIES}
......
......@@ -29,7 +29,7 @@
#include "common/Timing.hh"
#include "common/FileId.hh"
#include "common/ThreadPool.hh"
#include "mgm/cta_interface/eos_cta/include/CtaFrontendApi.hpp"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
#include "XrdOuc/XrdOucString.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "Xrd/XrdJob.hh"
......@@ -137,9 +137,9 @@ public:
}
Action(std::string a, std::string e, time_t when, std::string savedOnDay, std::string workflow,
std::string queue) : Action (a, e, when, workflow, queue)
std::string queue) : Action (std::move(a), std::move(e), when, std::move(workflow), std::move(queue))
{
mSavedOnDay = savedOnDay;
mSavedOnDay = std::move(savedOnDay);
}
std::string mAction;
......@@ -159,11 +159,12 @@ public:
}
Job(eos::common::FileId::fileid_t fid,
eos::common::Mapping::VirtualIdentity& vid)
eos::common::Mapping::VirtualIdentity& vid, const std::string& errorMessage = "")
{
mFid = fid;
mRetry = 0;
eos::common::Mapping::Copy(vid, mVid);
mErrorMesssage = errorMessage;
}
~Job() override = default;
......@@ -174,6 +175,7 @@ public:
mFid = other.mFid;
mDescription = other.mDescription;
mRetry = other.mRetry;
mErrorMesssage = other.mErrorMesssage;
}
// ---------------------------------------------------------------------------
// Job execution function
......@@ -251,6 +253,7 @@ public:
std::string mDescription;
eos::common::Mapping::VirtualIdentity mVid;
std::string mWorkflowPath;
std::string mErrorMesssage;
int mRetry;///! number of retries
private:
......@@ -302,6 +305,14 @@ public:
return mActiveJobs.load();
}
static std::string GetUserName(uid_t uid);
static std::string GetGroupName(gid_t gid);
static IContainerMD::XAttrMap CollectAttributes(const std::string& fullPath);
static void MoveFromRBackToQ();
/// the scheduler class is providing a destructor-less object,
/// so we have to create once a singleton of this and keep/share it
static XrdSysMutex gSchedulerMutex;
......
......@@ -28,7 +28,7 @@
#include "mgm/XrdMgmOfs.hh"
#include "mgm/WFE.hh"
#include "mgm/FsView.hh"
#include "mgm/Constants.hh"
#include "common/Constants.hh"
/*----------------------------------------------------------------------------*/
EOSMGMNAMESPACE_BEGIN
......@@ -36,11 +36,9 @@ EOSMGMNAMESPACE_BEGIN
/*----------------------------------------------------------------------------*/
int
Workflow::Trigger(std::string event, std::string workflow,
eos::common::Mapping::VirtualIdentity& vid)
Workflow::Trigger(const std::string& event, std::string workflow,
eos::common::Mapping::VirtualIdentity& vid, const std::string& errorMessage)
{
eos_static_info("event=\"%s\" workflow=\"%s\"", event.c_str(),
workflow.c_str());