FTS-894: Reenable VO shares but with weights

parent db39d47c
/*
* Copyright (c) CERN 2013-2017
*
* Copyright (c) Members of the EMI Collaboration. 2010-2013
* See http://www.eu-emi.eu/partners for details on the copyright
* holders.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef PAIR_H
#define PAIR_H
#include <iostream>
#include <string>
#include "common/Uri.h"
struct Pair {
std::string source, destination;
Pair(const std::string &s, const std::string &d): source(s), destination(d) {
}
bool isLanTransfer() const {
return fts3::common::isLanTransfer(source, destination);
}
};
// Required so it can be used as a key on a std::map
inline bool operator < (const Pair &a, const Pair &b) {
return a.source < b.source || (a.source == b.source && a.destination < b.destination);
}
inline std::ostream& operator << (std::ostream &os, const Pair &pair) {
return (os << pair.source << " => " << pair.destination);
}
#endif // PAIR_H
......@@ -27,14 +27,13 @@
class ShareConfig
{
public:
ShareConfig(): activeTransfers(0), shareOnly(false) {};
ShareConfig(): weight(0) {};
~ShareConfig() {};
std::string source;
std::string destination;
std::string vo;
int activeTransfers;
bool shareOnly;
int weight;
};
#endif // SHARECONFIG_H_
......@@ -292,10 +292,10 @@ struct type_conversion<ShareConfig>
static void from_base(values const& v, indicator, ShareConfig& config)
{
config.source = v.get<std::string>("source");
config.destination = v.get<std::string>("destination");
config.vo = v.get<std::string>("vo");
config.activeTransfers = v.get<int>("active");
config.source = v.get<std::string>("source");
config.destination = v.get<std::string>("destination");
config.vo = v.get<std::string>("vo");
config.weight = v.get<int>("active");
}
};
......
......@@ -28,6 +28,7 @@
#include <boost/noncopyable.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <db/generic/LinkConfig.h>
#include <db/generic/Pair.h>
#include "common/Uri.h"
......@@ -35,22 +36,6 @@
namespace fts3 {
namespace optimizer {
struct Pair {
std::string source, destination;
Pair(const std::string &s, const std::string &d): source(s), destination(d) {
}
bool isLanTransfer() const {
return common::isLanTransfer(source, destination);
}
};
// Required so it can be used as a key on a std::map
inline bool operator < (const Pair &a, const Pair &b) {
return a.source < b.source || (a.source == b.source && a.destination < b.destination);
}
struct Range {
int min, max;
......@@ -168,11 +153,6 @@ public:
};
inline std::ostream& operator << (std::ostream &os, const Pair &pair) {
return (os << pair.source << " => " << pair.destination);
}
inline std::ostream& operator << (std::ostream &os, const Range &range) {
return (os << range.min << "/" << range.max);
}
......
......@@ -19,6 +19,7 @@
*/
#include "TransfersService.h"
#include "VoShares.h"
#include "config/ServerConfig.h"
#include "common/DaemonTools.h"
......@@ -211,6 +212,9 @@ void TransfersService::executeUrlcopy()
DBSingleton::instance().getDBObjectInstance()->getQueuesWithPending(queues);
// Breaking determinism. See FTS-704 for an explanation.
std::random_shuffle(queues.begin(), queues.end());
// Apply VO shares at this level. Basically, if more than one VO is used the same link,
// pick one each time according to their respective weights
queues = applyVoShares(queues);
if (queues.empty()) {
return;
......
/*
* Copyright (c) CERN 2017
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VoShares.h"
#include "db/generic/SingleDbInstance.h"
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/discrete_distribution.hpp>
#include <common/Logger.h>
using namespace db;
using namespace fts3::common;
boost::mt19937 generator;
namespace fts3 {
namespace server {
/**
* Given the pair, a list of vos for the pair, and a list of weights for VOs, pick one
* based on those weights.
* @note If a VO is not on the map, it will fallback to 'public', if it is there
* @note If the weight for a VO/public is 0, it will never be picked!
*/
static QueueId selectQueueForPair(const Pair &pair,
const std::vector<std::string> &vos, const std::map<std::string, double> &weights)
{
// Weights per position in vos vector
std::vector<double> finalWeights(vos.size());
// Get the public (catchall weight)
// If there is no config, this is the only weight!
double publicWeight = 0;
if (weights.empty()) {
publicWeight = 1;
}
else {
auto publicIter = weights.find("public");
if (publicIter != weights.end()) {
publicWeight = publicIter->second;
}
}
// Need to calculate how many "public" there are, so we can split
int publicCount = 0;
for (auto i = vos.begin(); i != vos.end(); ++i) {
if (weights.find(*i) == weights.end()) {
++publicCount;
}
}
publicWeight /= static_cast<double>(publicCount);
// Second pass, fill up the weights
int pos = 0;
for (auto i = vos.begin(); i != vos.end(); ++i, ++pos) {
auto wIter = weights.find(*i);
if (wIter == weights.end()) {
finalWeights[pos] = publicWeight;
}
else {
finalWeights[pos] = wIter->second;
}
}
// And pick one at random
boost::random::discrete_distribution<> dist(finalWeights);
int chosen = dist(generator);
return QueueId(pair.source, pair.destination, vos[chosen]);
}
std::vector<QueueId> applyVoShares(const std::vector<QueueId> queues)
{
// Vo list for each pair
std::map<Pair, std::vector<std::string>> vosPerPair;
for (auto i = queues.begin(); i != queues.end(); ++i) {
vosPerPair[Pair(i->sourceSe, i->destSe)].push_back(i->voName);
}
// One VO per pair
std::vector<QueueId> result;
for (auto j = vosPerPair.begin(); j != vosPerPair.end(); ++j) {
const Pair &p = j->first;
const std::vector<std::string> &vos = j->second;
std::vector<ShareConfig> shares = DBSingleton::instance().getDBObjectInstance()->getShareConfig(p.source, p.destination);
std::map<std::string, double> weights;
for (auto k = shares.begin(); k != shares.end(); ++k) {
weights[k->vo] = k->weight;
}
QueueId chosen = selectQueueForPair(p, vos, weights);
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Chosen " << chosen.voName << " for " << p << commit;
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Options were " << commit;
for (auto i = vos.begin(); i != vos.end(); ++i) {
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "\t" << *i << commit;
}
result.emplace_back(chosen);
}
return result;
}
}
}
\ No newline at end of file
/*
* Copyright (c) CERN 2017
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef VOSHARES_H
#define VOSHARES_H
#include <vector>
#include "db/generic/QueueId.h"
namespace fts3 {
namespace server {
/**
* Apply VO shares if required.
* @param queues Set of queues with queued transfers
* @return A set of queues where there is only one per unique source/dest.
* Filtering is applied according to the configured relative weights.
*/
std::vector<QueueId> applyVoShares(const std::vector<QueueId> queues);
}
}
#endif // VOSHARES_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment