Commit dab289c6 authored by Mihai Patrascoiu's avatar Mihai Patrascoiu
Browse files

FTS-1687: Implement mechanism to enable/disable streaming per VO.

Fail streaming transfers if global server configuration does not allow streaming for the given VO
parent 9746c393
......@@ -270,6 +270,9 @@ public:
/// Returns how many seconds must be added to the timeout per MB to be transferred
virtual int getSecPerMb(const std::string &voName) = 0;
/// Returns the globally configured disable streaming flag
virtual bool getDisableStreamingFlag(const std::string &voName) = 0;
/// Puts into the vector queue the Queues for which there are pending transfers
virtual void getQueuesWithPending(std::vector<QueueId>& queues) = 0;
......
......@@ -473,9 +473,8 @@ bool MySqlAPI::getDisableDelegationFlag(const std::string &sourceSe, const std::
try
{
bool disable_delegation = false;
std::string sdisable_delegation;
soci::indicator ind;
std::string disable_delegation;
soci::indicator ind = soci::i_ok;
sql <<
"SELECT no_delegation FROM ("
......@@ -485,15 +484,13 @@ bool MySqlAPI::getDisableDelegationFlag(const std::string &sourceSe, const std::
" SELECT no_delegation FROM t_link_config WHERE source_se = '*' AND dest_se = '*' AND no_delegation IS NOT NULL"
") AS dlg LIMIT 1",
soci::use(sourceSe, "source"), soci::use(destSe, "dest"),
soci::into(sdisable_delegation, ind);
soci::into(disable_delegation, ind);
if (ind == soci::i_null) {
disable_delegation = false;
} else if (sdisable_delegation == "on") {
disable_delegation = true;
return false;
} else {
return disable_delegation == "on";
}
return disable_delegation;
}
catch (std::exception& e)
{
......@@ -562,6 +559,40 @@ int MySqlAPI::getSecPerMb(const std::string &voName)
}
bool MySqlAPI::getDisableStreamingFlag(const std::string& voName)
{
soci::session sql(*connectionPool);
try
{
std::string disable_streaming;
soci::indicator ind = soci::i_ok;
sql <<
"SELECT no_streaming FROM t_server_config "
"WHERE vo_name IN (:vo, '*') OR vo_name IS NULL "
"ORDER BY vo_name DESC LIMIT 1",
soci::use(voName), soci::into(disable_streaming, ind);
if (ind == soci::i_null) {
return false;
} else {
return disable_streaming == "on";
}
}
catch (std::exception& e)
{
sql.rollback();
throw UserError(std::string(__func__) + ": Caught exception " + e.what());
}
catch (...)
{
sql.rollback();
throw UserError(std::string(__func__) + ": Caught exception");
}
}
bool MySqlAPI::getCloudStorageCredentials(const std::string& user_dn,
const std::string& vo, const std::string& cloud_name, CloudStorageAuth& auth)
{
......
......@@ -222,6 +222,9 @@ public:
/// Returns how many seconds must be added to the timeout per MB to be transferred
virtual int getSecPerMb(const std::string &voName);
/// Returns the globally configured disable streaming flag
virtual bool getDisableStreamingFlag(const std::string &voName);
/// Puts into the vector queue the Queues for which there are pending transfers
virtual void getQueuesWithPending(std::vector<QueueId>& queues);
......
......@@ -149,6 +149,9 @@ void FileTransferExecutor::run(boost::any & ctx)
// Disable delegation (according to link config)
cmdBuilder.setDisableDelegation(db->getDisableDelegationFlag(tf.sourceSe, tf.destSe));
// Disable streaming via local transfers (according to global config)
cmdBuilder.setDisableStreaming(db->getDisableStreamingFlag(tf.voName));
// Enable monitoring
cmdBuilder.setMonitoring(monitoringMsg, msgDir);
......
......@@ -166,11 +166,13 @@ void UrlCopyCmd::setOAuthFile(const std::string &path)
setOption("oauth", path);
}
void UrlCopyCmd::setAuthMethod(const std::string &method)
{
setOption("authMethod", method);
}
void UrlCopyCmd::setFromTransfer(const TransferFile &transfer,
bool is_multiple, bool publishUserDn, const std::string &msgDir)
{
......@@ -291,12 +293,19 @@ void UrlCopyCmd::setMaxNumberOfRetries(int retry_max)
setOption("retry_max", retry_max);
}
void UrlCopyCmd::setDisableDelegation(bool disable_delegation)
{
setFlag("no-delegation", disable_delegation);
}
void UrlCopyCmd::setDisableStreaming(bool disable_streaming)
{
setFlag("no-streaming", disable_streaming);
}
int UrlCopyCmd::getBuffersize()
{
auto buffersize = options["tcp-buffersize"];
......
......@@ -77,6 +77,7 @@ public:
void setNumberOfRetries(int);
void setMaxNumberOfRetries(int);
void setDisableDelegation(bool);
void setDisableStreaming(bool);
// Observers
int getBuffersize();
......
......@@ -222,6 +222,14 @@ public:
}
}
void setStreamingFlag(bool value)
{
GError *error = NULL;
if (gfalt_set_local_transfer_perm(params, value, &error) < 0) {
throw Gfal2Exception(error);
}
}
void addEventCallback(gfalt_event_func callback, void *udata)
{
GError *error = NULL;
......
......@@ -73,6 +73,7 @@ const option UrlCopyOpts::long_options[] =
{"sec-per-mb", required_argument, 0, 808},
{"ipv4", no_argument, 0, 809},
{"no-delegation", no_argument, 0, 810},
{"no-streaming", no_argument, 0, 811},
{"retry", required_argument, 0, 820},
{"retry_max-max", required_argument, 0, 821},
......@@ -343,6 +344,9 @@ void UrlCopyOpts::parse(int argc, char * const argv[])
case 810:
noDelegation = true;
break;
case 811:
noStreaming = true;
break;
case 820:
retry = boost::lexical_cast<int>(optarg);
......
......@@ -68,6 +68,7 @@ public:
bool enableUdt;
boost::tribool enableIpv6;
unsigned addSecPerMb;
bool noStreaming;
bool enableMonitoring; // Legacy option
unsigned active; // Legacy option
......
......@@ -321,6 +321,7 @@ static void setupTransferConfig(const UrlCopyOpts &opts, const Transfer &transfe
params.setCreateParentDir(true);
params.setReplaceExistingFile(opts.overwrite);
params.setDelegationFlag(!opts.noDelegation);
params.setStreamingFlag(!opts.noStreaming);
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Source protocol: " << transfer.source.protocol << commit;
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Destination protocol: " << transfer.destination.protocol << commit;
......@@ -411,6 +412,7 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams &params
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Dest url: " << transfer.destination << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Overwrite enabled: " << opts.overwrite << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Disable delegation: " << opts.noDelegation << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Disable local streaming: " << opts.noStreaming << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Dest space token: " << transfer.destTokenDescription << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Source space token: " << transfer.sourceTokenDescription << commit;
FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checksum: " << transfer.checksumValue << commit;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment