Commit 974df5ce authored by Elvin Sindrilaru's avatar Elvin Sindrilaru

COMMON/UNIT_TESTS: Add XrdConnPool class for managing XRootD physical connections

This class can now be used throughout the code especially on the MGM side when
doing drain operations.

The similar functionality in fst/io/xrd/XrdIo.hh can be replaced with this class.

Added unit tests for XrdConnPool and also for the helper class XrdConnIdHelper
which ensures automatic assignment and relase of the connection id.
parent 305c3f9e
......@@ -76,6 +76,7 @@ set(EOSCOMMON_SRCS
StringTokenizer.cc
CommentLog.cc
XrdErrorMap.cc
XrdConnPool.cc XrdConnPool.hh
JeMallocHandler.cc
plugin_manager/Plugin.hh
plugin_manager/PluginManager.cc
......
//------------------------------------------------------------------------------
// File: XrdConnPool.cc
// Author: Elvin-Alin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "common/XrdConnPool.hh"
#include <limits>
#include <sstream>
EOSCOMMONNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XrdConnPool::XrdConnPool(bool is_enabled, uint32_t max_size):
mIsEnabled(is_enabled), mMaxSize(max_size)
{
if (getenv("EOS_XRD_USE_CONNECTION_POOL")) {
mIsEnabled = true;
if (getenv("EOS_XRD_CONNECTION_POOL_SIZE")) {
max_size = strtoul(getenv("EOS_XRD_CONNECTION_POOL_SIZE"), 0, 10);
}
if (max_size < 1) {
eos_warning("%s", "msg=\"wrong EOS_XRD_CONNECTION_POOL_SIZE, forcing "
"max size to 1\"");
max_size = 1;
}
if (max_size > 1024) {
eos_warning("%s", "msg=\"too big EOS_XRD_CONNECTION_POOL_SIZE, forcing "
"max size to 1024\"");
max_size = 1024;
}
mMaxSize = max_size;
}
}
//------------------------------------------------------------------------------
// Assign new connection from the pool to the given URL. What this actually
// means is updating the username used in the URL when connecting to the
// XRootD server.
//------------------------------------------------------------------------------
uint32_t
XrdConnPool::AssignConnection(XrdCl::URL& url)
{
uint32_t conn_id {0ull};
if (!mIsEnabled) {
return conn_id;
}
bool found {false};
uint32_t best_conn_id {1};
uint32_t best_conn_val {std::numeric_limits<uint32_t>::max()};
std::string target_host = url.GetHostName();
std::unique_lock<std::mutex> scope_lock(mPoolMutex);
// Map of connection id and score for current host
auto& map_id_score = mConnPool[target_host];
for (auto& elem : map_id_score) {
if (elem.second < best_conn_val) {
best_conn_id = elem.first;
best_conn_val = elem.second;
}
if (elem.second == 0) {
++elem.second;
conn_id = elem.first;
found = true;
break;
}
}
if (!found) {
if (map_id_score.size() >= mMaxSize) {
// Share the least busy connection
++map_id_score[best_conn_id];
conn_id = best_conn_id;
eos_warning("msg=\"connection pool limit reached - using %u/%u connections\"",
map_id_score.size(), mMaxSize);
} else {
conn_id = map_id_score.size() + 1;
map_id_score[conn_id] = 1;
}
}
if (conn_id) {
url.SetUserName(std::to_string(conn_id));
}
return conn_id;
}
//------------------------------------------------------------------------------
// Release a connection and update the status of the pool
//------------------------------------------------------------------------------
void
XrdConnPool::ReleaseConnection(const XrdCl::URL& url)
{
if (!mIsEnabled) {
return;
}
uint32_t conn_id {0ull};
try {
conn_id = std::stoul(url.GetUserName());
} catch (const std::exception& e) {
// ignore
}
if (conn_id) {
std::unique_lock<std::mutex> scope_lock(mPoolMutex);
auto it = mConnPool.find(url.GetHostName());
if (it != mConnPool.end()) {
auto& map_id_score = it->second;
if (map_id_score[conn_id] >= 1) {
--map_id_score[conn_id];
}
}
}
}
//------------------------------------------------------------------------------
// Dump the status of the connection pool to the given string
//------------------------------------------------------------------------------
void
XrdConnPool::Dump(std::string& out) const
{
std::ostringstream oss;
oss << "[connection-pool-dump]" << std::endl;
for (auto it = mConnPool.begin(); it != mConnPool.end(); ++it) {
for (auto fit = it->second.begin(); fit != it->second.end(); ++fit) {
oss << "[connection-pool] host=" << it->first << " id="
<< fit->first << " usage=" << fit->second << std::endl;
}
}
out = oss.str();
}
EOSCOMMONNAMESPACE_END
//------------------------------------------------------------------------------
// File: XrdConnPool.hh
// Author: Elvin-Alin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#pragma once
#include "common/Namespace.hh"
#include "common/Logging.hh"
#include "XrdCl/XrdClURL.hh"
#include <mutex>
EOSCOMMONNAMESPACE_BEGIN
//------------------------------------------------------------------------------
//! Class XrdConnPool help in creating a pool of xrootd connections that can
//! be reused and allocated the least congested connection to a new request.
//------------------------------------------------------------------------------
class XrdConnPool: public eos::common::LogId
{
public:
//----------------------------------------------------------------------------
//! Constructor
//!
//! @param is_enabled if true connection pool is enabled
//! @param max_size default max_size
//----------------------------------------------------------------------------
XrdConnPool(bool is_enabled = false, uint32_t max_size = 1024);
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
~XrdConnPool() = default;
//----------------------------------------------------------------------------
//! Assign new connection from the pool to the given URL. What this actually
//! means is updating the username used in the URL when connecting to the
//! XRootD server.
//!
//! @param url given url
//!
//! @return 0 if no connection id assigned, otherwise the value of the id
//----------------------------------------------------------------------------
uint32_t AssignConnection(XrdCl::URL& url);
//----------------------------------------------------------------------------
//! Release a connection and update the status of the pool
//!
//! @param url given url
//----------------------------------------------------------------------------
void ReleaseConnection(const XrdCl::URL& url);
//----------------------------------------------------------------------------
//! Dump the status of the connection pool to the given string
//!
//! @param out string containing the result
//----------------------------------------------------------------------------
void Dump(std::string& out) const;
private:
bool mIsEnabled; ///< Mark if connection pool is enabled
uint32_t mMaxSize; ///< Maximum size of the connection pool
std::map<std::string, std::map<uint32_t, uint32_t>> mConnPool;
std::mutex mPoolMutex; ///< Mutex protecting access to the pool
};
//------------------------------------------------------------------------------
//! Class XrdConnIdHelper RAAI helper to automatically assign and release
//! connection ids to the pool.
//! @note Needs to have the same lifetime as the XrdCl::File object that uses
//! the url.
//------------------------------------------------------------------------------
class XrdConnIdHelper final
{
public:
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
XrdConnIdHelper(XrdConnPool& pool, XrdCl::URL& url):
mPool(pool)
{
mConnId = mPool.AssignConnection(url);
mUrl = url;
}
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
~XrdConnIdHelper()
{
if (mConnId) {
mPool.ReleaseConnection(mUrl);
}
}
//----------------------------------------------------------------------------
//! Check if new connection allocated to URL
//----------------------------------------------------------------------------
bool HasNewConnection() const
{
return (mConnId != 0ull);
}
//----------------------------------------------------------------------------
//! Get allocated connection id
//----------------------------------------------------------------------------
uint32_t GetId() const
{
return mConnId;
}
private:
uint32_t mConnId; ///< Allocated connection id, 0 if none allocated
XrdConnPool& mPool; ///< Reference to connection pool
XrdCl::URL mUrl; ///< URL corresponding to the connection id
};
EOSCOMMONNAMESPACE_END
......@@ -302,6 +302,7 @@ QdbMaster::SlaveToMaster()
WFE::MoveFromRBackToQ();
// Notify all the nodes about the new master identity
FsView::gFsView.BroadcastMasterId(GetMasterId());
gOFS->mDrainEngine.Start();
mIsMaster = true;
Access::SetSlaveToMasterRules();
}
......@@ -327,6 +328,7 @@ QdbMaster::MasterToSlave()
DisableNsCaching();
Access::SetMasterToSlaveRules(new_master_id);
gOFS->mDrainEngine.Stop();
}
//------------------------------------------------------------------------------
......
......@@ -72,7 +72,8 @@ set(COMMON_UT_SRCS
common/RWMutexTest.cc
common/StringConversionTests.cc
common/LoggingTests.cc
common/LoggingTestsUtils.cc)
common/LoggingTestsUtils.cc
common/XrdConnPoolTests.cc)
set(FST_UT_SRCS
#fst/XrdFstOssFileTest.cc
......
//------------------------------------------------------------------------------
// File: XrdConnPoolTests.cc
// Author: Elvin-Alin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "gtest/gtest.h"
#include "Namespace.hh"
#include "common/XrdConnPool.hh"
#include <list>
EOSCOMMONTESTING_BEGIN
TEST(XrdConnPool, DefaultDisabled)
{
XrdCl::URL url;
std::string surl = "root://eospps.cern.ch:1094//path/test.dat";
url.FromString(surl);
eos::common::XrdConnPool pool;
ASSERT_EQ(pool.AssignConnection(url), 0);
ASSERT_EQ(url.GetURL(), surl);
}
TEST(XrdConnPool, EvenDistribuion)
{
XrdCl::URL url("root://eospps.cern.ch:1094/path/test.dat");
uint32_t max_size = 10;
eos::common::XrdConnPool pool(true, max_size);
std::ostringstream oss;
// Add one user per connection id
for (uint32_t i = 0; i < max_size; ++i) {
ASSERT_EQ(pool.AssignConnection(url), i + 1);
oss << "root://" << i + 1 << "@eospps.cern.ch:1094/path/test.dat";
ASSERT_EQ(url.GetURL(), oss.str());
oss.str("");
}
// Add two more users per connection id
for (int j = 0; j < 2; ++j) {
for (uint32_t i = 0; i < max_size; ++i) {
ASSERT_EQ(pool.AssignConnection(url), i + 1);
oss << "root://" << i + 1 << "@eospps.cern.ch:1094/path/test.dat";
ASSERT_EQ(url.GetURL(), oss.str());
oss.str("");
}
}
// Free one connection id 5 at a time and allocate a new one
oss.str("root://5@eospps.cern.ch:1094/path/test.dat");
for (int i = 0; i < 3; ++i) {
pool.ReleaseConnection(oss.str());
ASSERT_EQ(pool.AssignConnection(url), 5);
}
// Free all connection ids 5 in one go
for (int i = 0; i < 3; ++i) {
pool.ReleaseConnection(oss.str());
}
ASSERT_EQ(pool.AssignConnection(url), 5);
ASSERT_EQ(pool.AssignConnection(url), 5);
ASSERT_EQ(pool.AssignConnection(url), 5);
// Now the pool should allocate from id 1 since they all have 3 clients
// per id
ASSERT_EQ(pool.AssignConnection(url), 1);
}
TEST(XrdConnPool, ConnIdHelper)
{
XrdCl::URL url("root://eospps.cern.ch:1094/path/test.dat");
uint32_t max_size = 10;
eos::common::XrdConnPool pool(true, max_size);
std::ostringstream oss;
// Each gets the same id since it's released at the end of each loop
for (uint32_t i = 0; i < max_size; ++i) {
XrdConnIdHelper id_helper(pool, url);
ASSERT_EQ(id_helper.GetId(), 1);
}
// Allocate them dynamically
std::list<XrdConnIdHelper*> lst;
for (uint32_t i = 0; i < max_size; ++i) {
auto elem = new XrdConnIdHelper(pool, url);
lst.push_back(elem);
ASSERT_EQ(elem->GetId(), i + 1);
}
// std::string out;
// pool.Dump(out);
// std::cout << out << std::endl;
// Release the last two ids and try new assignments
for (int i = 0; i < 2; ++i) {
auto elem = lst.back();
delete elem;
lst.pop_back();
}
for (int i = 0; i < 2; ++i) {
auto elem = new XrdConnIdHelper(pool, url);
lst.push_back(elem);
ASSERT_EQ(elem->GetId(), max_size - 1 + i);
}
for (auto& elem : lst) {
delete elem;
}
lst.clear();
// Add one connection it should get the id 1
auto elem = new XrdConnIdHelper(pool, url);
lst.push_back(elem);
ASSERT_EQ(elem->GetId(), 1);
delete elem;
lst.clear();
}
EOSCOMMONTESTING_END
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment