FTS-894: Adapt to new schema

Note that this disables VO shares
parent 79e1693b
......@@ -31,9 +31,6 @@
#include "JobStatus.h"
#include "FileTransferStatus.h"
#include "SeConfig.h"
#include "SeGroup.h"
#include "SeProtocolConfig.h"
#include "QueueId.h"
#include "LinkConfig.h"
#include "ShareConfig.h"
......@@ -47,7 +44,6 @@
#include "Job.h"
#include "MinFileStatus.h"
#include "StagingOperation.h"
#include "StorageElement.h"
#include "TransferFile.h"
#include "UserCredential.h"
#include "UserCredentialCache.h"
......@@ -209,39 +205,18 @@ public:
/// @note This method is used only for reuse and multihop jobs
virtual void forkFailed(const std::string& jobId) = 0;
/// Return true if the group 'groupName' exists
virtual bool checkGroupExists(const std::string & groupName) = 0;
/// @return the group to which storage belong
/// @note It will be the empty string if there is no group
virtual std::string getGroupForSe(const std::string storage) = 0;
/// Get the link configuration for the link defined by the source and destination given
virtual std::unique_ptr<LinkConfig> getLinkConfig(const std::string &source, const std::string &destination) = 0;
/// Register a new VO share configuration
virtual void addShareConfig(const ShareConfig& cfg) = 0;
//virtual void addShareConfig(const ShareConfig& cfg) = 0;
/// Get the VO share configuration for the given link and VO
virtual std::unique_ptr<ShareConfig> getShareConfig(const std::string &source, const std::string &destination,
const std::string &vo) = 0;
//virtual std::unique_ptr<ShareConfig> getShareConfig(const std::string &source, const std::string &destination,
// const std::string &vo) = 0;
/// Get the list of VO share configurations for the given link
virtual std::vector<ShareConfig> getShareConfig(const std::string &source, const std::string &destination) = 0;
/// Register in the DB that the given file ID has been scheduled for a share configuration
virtual void addFileShareConfig(int fileId, const std::string &source, const std::string &destination,
const std::string &vo) = 0;
/// Returns how many active transfers there is for the given link and VO
virtual int countActiveTransfers(const std::string &source, const std::string &destination,
const std::string &vo) = 0;
/// Returns how many outbound transfers there is from the given storage and VO
virtual int countActiveOutboundTransfersUsingDefaultCfg(const std::string &se, const std::string &vo) = 0;
/// Returns how many inbound transfers there is towards the given storage for the given VO
virtual int countActiveInboundTransfersUsingDefaultCfg(const std::string &se, const std::string &vo) = 0;
//virtual std::vector<ShareConfig> getShareConfig(const std::string &source, const std::string &destination) = 0;
/// Returns the total value of all the shares for the given link and set of VO
virtual int sumUpVoShares(const std::string &source, const std::string &destination,
......@@ -313,8 +288,7 @@ public:
virtual bool isProtocolIPv6(const std::string &sourceSe, const std::string &destSe) = 0;
/// Returns how many streams must be used for the given link
virtual int getStreamsOptimization(const std::string &voName,
const std::string &sourceSe, const std::string &destSe)= 0;
virtual int getStreamsOptimization(const std::string &sourceSe, const std::string &destSe)= 0;
/// Returns the globally configured transfer timeout
virtual int getGlobalTimeout(const std::string &voName) = 0;
......@@ -322,9 +296,6 @@ public:
/// Returns how many seconds must be added to the timeout per MB to be transferred
virtual int getSecPerMb(const std::string &voName) = 0;
/// Returns the optimizer level for the TCP buffersize
virtual int getBufferOptimization() = 0;
/// Puts into the vector queue the Queues for which there are pending transfers
virtual void getQueuesWithPending(std::vector<QueueId>& queues) = 0;
......
......@@ -23,23 +23,26 @@
#include <string>
enum OptimizerMode {
kOptimizerDisabled = 0,
kOptimizerConservative = 1,
kOptimizerNormal = 2,
kOptimizerAggressive = 3
};
class LinkConfig
{
public:
LinkConfig() : numberOfStreams(2), tcpBufferSize(0), transferTimeout(3600) {};
LinkConfig(): minActive(0), maxActive(0), optimizerMode(kOptimizerDisabled), tcpBufferSize(0), numberOfStreams(0) {}
~LinkConfig() {};
std::string source;
std::string destination;
std::string state;
std::string symbolicName;
int numberOfStreams;
int minActive;
int maxActive;
OptimizerMode optimizerMode;
int tcpBufferSize;
int transferTimeout;
std::string autoTuning;
int numberOfStreams;
};
#endif // LINKCONFIG_H_
/*
* Copyright (c) CERN 2013-2015
*
* Copyright (c) Members of the EMI Collaboration. 2010-2013
* See http://www.eu-emi.eu/partners for details on the copyright
* holders.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef SECONFIG_H_
#define SECONFIG_H
#include <iostream>
class SeConfig
{
public:
SeConfig(): active(0) {}
~SeConfig() {}
std::string source;
std::string destination;
std::string vo;
int active;
std::string symbolicName;
std::string state;
};
#endif // SECONFIG_H_
/*
* Copyright (c) CERN 2013-2015
*
* Copyright (c) Members of the EMI Collaboration. 2010-2013
* See http://www.eu-emi.eu/partners for details on the copyright
* holders.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef SEGROUP_H_
#define SEGROUP_H_
#include <string>
class SeGroup
{
public:
SeGroup(): active(0) {}
~SeGroup() {}
std::string symbolicName;
std::string groupName;
std::string member;
int active;
std::string vo;
};
#endif // SEGROUP_H_
/*
* Copyright (c) CERN 2013-2015
*
* Copyright (c) Members of the EMI Collaboration. 2010-2013
* See http://www.eu-emi.eu/partners for details on the copyright
* holders.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef SEPROTOCOLCONFIG_H_
#define SEPROTOCOLCONFIG_H_
#include <iostream>
#include <string>
class SeProtocolConfig
{
public:
SeProtocolConfig(): numberOfStreams(0), tcpBufferSize(0), transferTimeout(0)
{
}
std::string symbolicName;
int numberOfStreams;
int tcpBufferSize;
int transferTimeout;
};
#endif // SEPROTOCOLCONFIG_H_
/*
* Copyright (c) CERN 2013-2015
*
* Copyright (c) Members of the EMI Collaboration. 2010-2013
* See http://www.eu-emi.eu/partners for details on the copyright
* holders.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef STORAGELEMENT_H_
#define STORAGELEMENT_H_
#include <string>
class StorageElement
{
public:
std::string endpoint;
std::string seType;
std::string site;
std::string name;
std::string state;
std::string version;
std::string host;
std::string seTransferType;
std::string seTransferProtocol;
std::string seControlProtocol;
std::string gocdb_id;
};
#endif // STORAGELEMENT_H_
......@@ -43,6 +43,7 @@ public:
int buffersize;
bool strictCopy;
boost::tribool ipv6;
boost::tribool udt;
// If true, gfal2/davix must generate the S3 signature with the bucket on the path
bool s3Alternate;
......
This diff is collapsed.
This diff is collapsed.
......@@ -26,6 +26,7 @@
#include "msg-bus/consumer.h"
#include "msg-bus/producer.h"
OptimizerMode getOptimizerModeInner(soci::session &sql, const std::string &source, const std::string &dest);
class MySqlAPI : public GenericDbIfce
{
......@@ -167,13 +168,6 @@ public:
/// @note This method is used only for reuse and multihop jobs
virtual void forkFailed(const std::string& jobId);
/// Return true if the group 'groupName' exists
virtual bool checkGroupExists(const std::string & groupName);
/// @return the group to which storage belong
/// @note It will be the empty string if there is no group
virtual std::string getGroupForSe(const std::string storage);
/// Get the link configuration for the link defined by the source and destination given
virtual std::unique_ptr<LinkConfig> getLinkConfig(const std::string &source, const std::string &destination);
......@@ -187,20 +181,6 @@ public:
/// Get the list of VO share configurations for the given link
virtual std::vector<ShareConfig> getShareConfig(const std::string &source, const std::string &destination);
/// Register in the DB that the given file ID has been scheduled for a share configuration
virtual void addFileShareConfig(int fileId, const std::string &source, const std::string &destination,
const std::string &vo);
/// Returns how many active transfers there is for the given link and VO
virtual int countActiveTransfers(const std::string &source, const std::string &destination,
const std::string &vo);
/// Returns how many outbound transfers there is from the given storage and VO
virtual int countActiveOutboundTransfersUsingDefaultCfg(const std::string &se, const std::string &vo);
/// Returns how many inbound transfers there is towards the given storage for the given VO
virtual int countActiveInboundTransfersUsingDefaultCfg(const std::string &se, const std::string &vo);
/// Returns the total value of all the shares for the given link and set of VO
virtual int sumUpVoShares(const std::string &source, const std::string &destination,
const std::set<std::string> &vos);
......@@ -259,14 +239,13 @@ public:
virtual bool getDrain();
/// Returns if for the given link, UDT has been enabled
virtual bool isProtocolUDT(const std::string &sourceSe, const std::string &destSe);
virtual bool isProtocolUDT(const std::string &source, const std::string &dest);
/// Returns if for the given link, IPv6 has been enabled
virtual bool isProtocolIPv6(const std::string &sourceSe, const std::string &destSe);
virtual bool isProtocolIPv6(const std::string &source, const std::string &dest);
/// Returns how many streams must be used for the given link
virtual int getStreamsOptimization(const std::string &voName,
const std::string &sourceSe, const std::string &destSe);
virtual int getStreamsOptimization(const std::string &sourceSe, const std::string &destSe);
/// Returns the globally configured transfer timeout
virtual int getGlobalTimeout(const std::string &voName);
......@@ -274,9 +253,6 @@ public:
/// Returns how many seconds must be added to the timeout per MB to be transferred
virtual int getSecPerMb(const std::string &voName);
/// Returns the optimizer level for the TCP buffersize
virtual int getBufferOptimization();
/// Puts into the vector queue the Queues for which there are pending transfers
virtual void getQueuesWithPending(std::vector<QueueId>& queues);
......
......@@ -35,7 +35,7 @@ static void setNewOptimizerValue(soci::session &sql,
{
sql.begin();
sql <<
"INSERT INTO t_optimize_active (source_se, dest_se, active, ema, datetime) "
"INSERT INTO t_optimizer (source_se, dest_se, active, ema, datetime) "
"VALUES (:source, :dest, :active, :ema, UTC_TIMESTAMP()) "
"ON DUPLICATE KEY UPDATE "
" active = :active, ema = :ema, datetime = UTC_TIMESTAMP()",
......@@ -138,9 +138,8 @@ public:
}
int getOptimizerMode(void) {
extern int getOptimizerMode(soci::session &sql);
return getOptimizerMode(sql);
OptimizerMode getOptimizerMode(const std::string &source, const std::string &dest) {
return getOptimizerModeInner(sql, source, dest);
}
bool isRetryEnabled(void) {
......@@ -151,40 +150,6 @@ public:
return isRetryNull != soci::i_null && retryValue;
}
int getGlobalStorageLimit(void) {
int maxPerSe = 0;
soci::indicator isSeNull;
sql << "SELECT max_per_se "
"FROM t_server_config "
"WHERE max_per_se > 0 AND"
" vo_name IS NULL OR vo_name = '*'",
soci::into(maxPerSe, isSeNull);
if (maxPerSe < 0 || isSeNull != soci::i_ok) {
maxPerSe = 0;
}
return maxPerSe;
}
int getGlobalLinkLimit(void) {
int linkLimit = 0;
soci::indicator isLinkNull;
sql << "SELECT max_per_link "
"FROM t_server_config "
"WHERE max_per_link > 0 AND"
" vo_name IS NULL OR vo_name = '*'",
soci::into(linkLimit, isLinkNull);
if (linkLimit < 0 || isLinkNull != soci::i_ok) {
linkLimit = 0;
}
return linkLimit;
}
void getPairLimits(const Pair &pair, Range *range, Limits *limits) {
soci::indicator nullIndicator;
......@@ -192,35 +157,36 @@ public:
limits->throughputSource = 0;
limits->throughputDestination = 0;
sql << "SELECT active, throughput FROM t_optimize WHERE source_se = :source_se AND active IS NOT NULL",
// Storage limits
sql <<
"SELECT outbound_max_throughput, outbound_max_active FROM ("
" SELECT outbound_max_throughput, outbound_max_active FROM t_se WHERE storage = :source UNION "
" SELECT outbound_max_throughput, outbound_max_active FROM t_se WHERE storage = '*' "
") AS se LIMIT 1",
soci::use(pair.source),
soci::into(limits->source), soci::into(limits->throughputSource, nullIndicator);
sql << "SELECT active, throughput FROM t_optimize WHERE dest_se = :dest_se AND active IS NOT NULL",
soci::use(pair.destination),
soci::into(limits->destination), soci::into(limits->throughputDestination, nullIndicator);
soci::into(limits->source, nullIndicator), soci::into(limits->throughputSource, nullIndicator);
unsigned storedActive = 0;
std::string activeFixedFlagStr;
soci::indicator isNullFixedFlag, isNullMin, isNullMax, isNullActive;
sql <<
"SELECT inbound_max_throughput, inbound_max_active FROM ("
" SELECT inbound_max_throughput, inbound_max_active FROM t_se WHERE storage = :dest UNION "
" SELECT inbound_max_throughput, inbound_max_active FROM t_se WHERE storage = '*' "
") AS se LIMIT 1",
soci::use(pair.destination),
soci::into(limits->destination, nullIndicator), soci::into(limits->throughputDestination, nullIndicator);
// Link working range
soci::indicator isNullMin, isNullMax;
sql <<
"SELECT min_active, max_active FROM ("
" SELECT min_active, max_active FROM t_link_config WHERE source_se = :source AND dest_se = :dest UNION "
" SELECT min_active, max_active FROM t_link_config WHERE source_se = :source AND dest_se = '*' UNION "
" SELECT min_active, max_active FROM t_link_config WHERE source_se = '*' AND dest_se = :dest UNION "
" SELECT min_active, max_active FROM t_link_config WHERE source_se = '*' AND dest_se = '*' "
") AS lc LIMIT 1",
soci::use(pair.source, "source"), soci::use(pair.destination, "dest"),
soci::into(range->min, isNullMin), soci::into(range->max, isNullMax);
sql << "SELECT fixed, active, min_active, max_active FROM t_optimize_active "
"WHERE source_se = :source AND dest_se = :dest",
soci::use(pair.source), soci::use(pair.destination),
soci::into(activeFixedFlagStr, isNullFixedFlag),
soci::into(storedActive, isNullActive),
soci::into(range->min, isNullMin),
soci::into(range->max, isNullMax);
if (isNullFixedFlag != soci::i_null && activeFixedFlagStr == "on") {
if (isNullMin == soci::i_null) {
range->min = storedActive;
}
if (isNullMax == soci::i_null) {
range->max = storedActive;
}
}
else {
if (isNullMin == soci::i_null || isNullMax == soci::i_null) {
range->min = range->max = 0;
}
}
......@@ -229,8 +195,8 @@ public:
soci::indicator isCurrentNull;
int currentActive = 0;
sql << "SELECT active FROM t_optimize_active "
"WHERE source_se = :source AND dest_se = :dest_se LIMIT 1",
sql << "SELECT active FROM t_optimizer "
"WHERE source_se = :source AND dest_se = :dest_se",
soci::use(pair.source),soci::use(pair.destination),
soci::into(currentActive, isCurrentNull);
......@@ -442,11 +408,9 @@ public:
void storeOptimizerStreams(const Pair &pair, int streams) {
sql.begin();
sql << "INSERT INTO t_optimize_streams (source_se, dest_se, nostreams, datetime) "
" VALUES(:source, :dest, :nostreams, UTC_TIMESTAMP()) "
" ON DUPLICATE KEY"
" UPDATE "
" nostreams = :nostreams, datetime = UTC_TIMESTAMP()",
sql << "UPDATE t_optimizer "
"SET nostreams = :nostreams, datetime = UTC_TIMESTAMP() "
"WHERE source_se = :source AND dest_se = :dest",
soci::use(pair.source, "source"), soci::use(pair.destination, "dest"),
soci::use(streams, "nostreams");
......
......@@ -25,9 +25,6 @@
#include "db/generic/FileTransferStatus.h"
#include "db/generic/JobStatus.h"
#include "db/generic/LinkConfig.h"
#include "db/generic/StorageElement.h"
#include "db/generic/SeGroup.h"
#include "db/generic/SeProtocolConfig.h"
#include "db/generic/ShareConfig.h"
#include "db/generic/CloudStorageAuth.h"
#include <soci.h>
......@@ -178,19 +175,6 @@ struct type_conversion<TransferFile>
}
};
template <>
struct type_conversion<SeProtocolConfig>
{
typedef values base_type;
static void from_base(values const& v, indicator, SeProtocolConfig& protoConfig)
{
protoConfig.tcpBufferSize = v.get<int>("tcp_buffer_size", 0);
protoConfig.numberOfStreams = v.get<int>("nostreams", 0);
protoConfig.transferTimeout = v.get<int>("URLCOPY_TX_TO", 0);
}
};
template <>
struct type_conversion<JobStatus>
{
......@@ -301,43 +285,6 @@ struct type_conversion<FileTransferStatus>
}
};
template <>
struct type_conversion<StorageElement>
{
typedef values base_type;
static void from_base(values const& v, indicator, StorageElement& se)
{
se.endpoint = v.get<std::string>("endpoint");
se.seType = v.get<std::string>("se_type");
se.site = v.get<std::string>("site");
se.name = v.get<std::string>("name");
se.state = v.get<std::string>("state");
se.version = v.get<std::string>("version");
se.host = v.get<std::string>("host");
se.seTransferType = v.get<std::string>("se_transfer_type");
se.seTransferProtocol = v.get<std::string>("se_transfer_protocol");
se.seControlProtocol = v.get<std::string>("se_control_protocol");
se.gocdb_id = v.get<std::string>("gocdb_id");
}
};
template <>
struct type_conversion<SeConfig>
{
typedef values base_type;
static void from_base(values const& v, indicator, SeConfig& config)
{
config.source = v.get<std::string>("source");
config.destination = v.get<std::string>("dest");
config.vo = v.get<std::string>("vo");
config.symbolicName = v.get<std::string>("symbolicName");
config.state = v.get<std::string>("state");
}
};
template <>
struct type_conversion<ShareConfig>
{
......@@ -352,20 +299,6 @@ struct type_conversion<ShareConfig>
}
};
template <>
struct type_conversion<SeGroup>
{
typedef values base_type;
static void from_base(values const& v, indicator, SeGroup& grp)
{
grp.active = v.get<int>("active", -1);
grp.groupName = v.get<std::string>("groupName");
grp.member = v.get<std::string>("member");
grp.symbolicName = v.get<std::string>("symbolicName");
}
};
template <>
struct type_conversion<LinkConfig>
{
......@@ -375,12 +308,11 @@ struct type_conversion<LinkConfig>
{
lnk.source = v.get<std::string>("source");
lnk.destination = v.get<std::string>("destination");
lnk.state = v.get<std::string>("state");
lnk.symbolicName = v.get<std::string>("symbolicName");
lnk.numberOfStreams = v.get<int>("nostreams");
lnk.tcpBufferSize = v.get<int>("tcp_buffer_size");
lnk.transferTimeout = v.get<int>("urlcopy_tx_to");
lnk.autoTuning = v.get<std::string>("auto_tuning");
lnk.minActive = v.get<int>("min_active", 0);
lnk.maxActive = v.get<int>("max_active", 0);
lnk.optimizerMode = v.get<OptimizerMode>("optimizer_mode", kOptimizerDisabled);
lnk.tcpBufferSize = v.get<int>("tcp_buffer_size", 0);
lnk.numberOfStreams = v.get<int>("nostreams", 0);
}
};
......@@ -416,4 +348,42 @@ struct type_conversion<CloudStorageAuth>
}
};
template<>
struct type_conversion<boost::logic::tribool>
{
typedef int base_type;
static void from_base(int v, indicator ind, boost::logic::tribool& tribool)
{
if (ind == soci::i_null) {
tribool = boost::logic::indeterminate;
}
else {
tribool = (v != 0);
}
}
};
template<>
struct type_conversion<OptimizerMode>
{
typedef int base_type;
static void from_base(int v, indicator ind, OptimizerMode& mode)
{
if (ind == soci::i_null) {
mode = kOptimizerDisabled;
}
else if (v > kOptimizerAggressive) {
mode = kOptimizerAggressive;
}
else if (v < 0) {
mode = kOptimizerDisabled;
}
else {
mode = static_cast<OptimizerMode>(v);
}
}
};
}
......@@ -31,10 +31,7 @@ namespace optimizer {
Optimizer::Optimizer(OptimizerDataSource *ds):
dataSource(ds), optimizerSteadyInterval(boost::posix_time::seconds(60)), maxNumberOfStreams(10),
globalMaxPerLink(DEFAULT_MAX_ACTIVE_PER_LINK),
globalMaxPerStorage(DEFAULT_MAX_ACTIVE_ENDPOINT_LINK),
optimizerMode(kConservative)
dataSource(ds), optimizerSteadyInterval(boost::posix_time::seconds(60)), maxNumberOfStreams(10)
{
}
......@@ -59,27 +56,6 @@ void Optimizer::setMaxNumberOfStreams(int newValue)
void Optimizer::run(void)
{
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Optimizer run" << commit;
// Query global config once
globalMaxPerStorage = dataSource->getGlobalStorageLimit();
if (globalMaxPerStorage <= 0) {
globalMaxPerStorage = DEFAULT_MAX_ACTIVE_ENDPOINT_LINK;
}
globalMaxPerLink = dataSource->getGlobalLinkLimit();
if (globalMaxPerLink <= 0) {
globalMaxPerLink = DEFAULT_MAX_ACTIVE_PER_LINK;
}
int rawOptMode = dataSource->getOptimizerMode();
if (rawOptMode < kConservative) {
rawOptMode = static_cast<int>(kConservative);
}
if (rawOptMode > kAggressive) {
rawOptMode = static_cast<int>(kAggressive);
}
optimizerMode = static_cast<OptimizerMode>(rawOptMode);
try {
std::list<Pair> pairs = dataSource->getActivePairs();
......@@ -98,9 +74,10 @@ void Optimizer::run(void)
void Optimizer::runOptimizerForPair(const Pair &pair)
{