GitLab unavailability on July 18, 22, 23 due to hypervisor security updates: http://cern.ch/go/BP7D

...
 
Commits (25)
......@@ -810,15 +810,10 @@ bool FileSystem::applyBatch(const FileSystemUpdateBatch &batch) {
//------------------------------------------------------------------------------
// Apply the given core parameters
//------------------------------------------------------------------------------
bool FileSystem::applyCoreParams(const FileSystemCoreParams &params, const std::string &configstatus) {
bool FileSystem::applyCoreParams(const FileSystemCoreParams &params) {
FileSystemUpdateBatch batch;
batch.setId(params.getId());
batch.setStringDurable("uuid", params.getUuid());
if(!configstatus.empty()) {
batch.setStringDurable("configstatus", configstatus);
}
batch.setStringDurable("schedgroup", params.getGroupLocator().getGroup());
batch.setStringDurable("configstatus", GetConfigStatusAsString(params.getConfigStatus()));
return applyBatch(batch);
......
......@@ -630,7 +630,7 @@ public:
//----------------------------------------------------------------------------
//! Apply the given core parameters
//----------------------------------------------------------------------------
bool applyCoreParams(const FileSystemCoreParams &params, const std::string &configstatus);
bool applyCoreParams(const FileSystemCoreParams &params);
//----------------------------------------------------------------------------
//! Set a single local long long
......@@ -677,15 +677,7 @@ public:
bool
SetId(fsid_t fsid)
{
XrdMqSharedHash* hash = nullptr;
RWMutexReadLock lock(mSom->HashMutex);
if ((hash = mSom->GetObject(mQueuePath.c_str(), "hash"))) {
hash->Set("id", (long long) fsid);
return true;
} else {
return false;
}
return SetString("id", std::to_string(fsid).c_str());
}
//----------------------------------------------------------------------------
......
......@@ -41,7 +41,7 @@ This chapter discusses several components of EOS and how they are configured.
configuration/route
configuration/s3
configuration/scheduler
configuratoin/tracker
configuration/tracker
configuration/transfer
configuration/tty
configuration/wfe
......
......@@ -6,7 +6,7 @@
Replication Tracker
===================
The Replication Tracker follows the workflow of file creations. For each created file a virtual entry is created in the ``proc/creation`` directory. Entries are removed once a layout is completely commited. The purpose of this tracker is to find inconsistent files after creation and to remove atomic upload relicts automatically after two days.
The Replication Tracker follows the workflow of file creations. For each created file a virtual entry is created in the ``proc/tracker`` directory. Entries are removed once a layout is completely commited. The purpose of this tracker is to find inconsistent files after creation and to remove atomic upload relicts automatically after two days.
Configuration
-------------
......@@ -33,7 +33,7 @@ The current status of the Tracker can be seen via:
# Space Variables
# ....................................................................................
...
trakcer := off
tracker := off
...
......@@ -61,7 +61,7 @@ The displayed reasons are:
* REPLOW - the replica number is too low
* ATOMIC - the file is an atomic upload
* KEEPIT - the file is still in flight
* ENONET - the tracking entry has no corresponding namespace entry with the given file-id
* ENOENT - the tracking entry has no corresponding namespace entry with the given file-id
* REP_OK - the tracking entry is healthy and can be removed - FUSE files appear here when not replica has been committed yet
......
......@@ -29,5 +29,5 @@ EOS releases are named after gemstones. The actively developed version is called
================================= =================== =================== =================================
:doc:`releases/amber` 0.2.47 1st EOS Generation
:doc:`releases/beryl` 0.3.267-aquamarine 2nd EOS Generation :doc:`releases/beryl-release`
:doc:`releases/citrine` 4.5.1 3rd EOS Generation :doc:`releases/citrine-release`
:doc:`releases/citrine` 4.5.2 3rd EOS Generation :doc:`releases/citrine-release`
================================= =================== =================== =================================
......@@ -15,6 +15,18 @@ Introduction
------------
This release is based on XRootD V4 and IPV6 enabled.
``v4.5.2 Citrine``
===================
2019-06-27
Bug
---
* if eosxd is compiled without ROCKSDB support, it should not touch mdcachedir e.g. it has to stay empty - fixes EOS-3558
* require eos-rocksdb on SLC6 and EL7 to have support for swapping inodes
``v4.5.1 Citrine``
===================
......
......@@ -223,6 +223,8 @@ BuildRequires: centos-release-scl
%if 0%{distribution} == 6 || 0%{distribution} == 7
BuildRequires: %{devtoolset}
BuildRequires: %{devtoolset}-binutils-devel
# We want swap-support on eosxd - requires rocksdb KV
BuildRequires: eos-rocksdb = 5.7.3-1%{dist}
%else
BuildRequires: binutils-devel
%endif
......
......@@ -43,12 +43,8 @@ Storage::GetBalanceSlotVariables(unsigned long long& nparalleltx,
*/
/*----------------------------------------------------------------------------*/
{
eos::common::RWMutexReadLock rd_lock(gOFS.ObjectManager.HashMutex);
XrdMqSharedHash* confighash = gOFS.ObjectManager.GetHash(
nodeconfigqueue.c_str());
std::string manager = confighash ? confighash->Get("manager") : "unknown";
nparalleltx = confighash ? confighash->GetLongLong("stat.balance.ntx") : 0;
ratetx = confighash ? confighash->GetLongLong("stat.balance.rate") : 0;
getFSTConfigValue("stat.balance.ntx", nparalleltx);
getFSTConfigValue("stat.balance.rate", ratetx);
if (nparalleltx == 0) {
nparalleltx = 0;
......@@ -58,8 +54,7 @@ Storage::GetBalanceSlotVariables(unsigned long long& nparalleltx,
ratetx = 25;
}
eos_static_debug("manager=%s nparalleltransfers=%llu transferrate=%llu",
manager.c_str(), nparalleltx, ratetx);
eos_static_debug("nparalleltransfers=%llu transferrate=%llu", nparalleltx, ratetx);
}
/*----------------------------------------------------------------------------*/
......
......@@ -28,6 +28,132 @@
EOSFSTNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Get configuration value from global FST config
//------------------------------------------------------------------------------
bool
Storage::getFSTConfigValue(const std::string &key, std::string &value) const {
eos::common::RWMutexReadLock lock(gOFS.ObjectManager.HashMutex);
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(
Config::gConfig.getFstNodeConfigQueue("getConfigValue", false).c_str(),
"hash");
if(!hash) {
return false;
}
value = hash->Get(key.c_str());
return true;
}
bool
Storage::getFSTConfigValue(const std::string &key, unsigned long long &value) {
std::string strVal;
if(!getFSTConfigValue(key, strVal)) {
return false;
}
value = atoi(strVal.c_str());
return true;
}
//------------------------------------------------------------------------------
// Process incoming configuration change
//------------------------------------------------------------------------------
void
Storage::processIncomingFstConfigurationChange(const std::string &key) {
std::string tmpValue;
if(!getFSTConfigValue(key.c_str(), tmpValue)) {
return;
}
if (key == "symkey") {
eos_static_info("symkey=%s", tmpValue.c_str());
eos::common::gSymKeyStore.SetKey64(tmpValue.c_str(), 0);
return;
}
if (key == "manager") {
eos_static_info("manager=%s", tmpValue.c_str());
XrdSysMutexHelper lock(Config::gConfig.Mutex);
Config::gConfig.Manager = tmpValue.c_str();
return;
}
if (key == "publish.interval") {
eos_static_info("publish.interval=%s", tmpValue.c_str());
XrdSysMutexHelper lock(Config::gConfig.Mutex);
Config::gConfig.PublishInterval = atoi(tmpValue.c_str());
return;
}
if (key == "debug.level") {
std::string debuglevel = tmpValue;
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
int debugval = g_logging.GetPriorityByString(debuglevel.c_str());
if (debugval < 0) {
eos_static_err("debug level %s is not known!", debuglevel.c_str());
} else {
// we set the shared hash debug for the lowest 'debug' level
if (debuglevel == "debug") {
gOFS.ObjectManager.SetDebug(true);
} else {
gOFS.ObjectManager.SetDebug(false);
}
g_logging.SetLogPriority(debugval);
}
return;
}
// creation/deletion of gateway transfer queue
if (key == "txgw") {
std::string gw = tmpValue;
eos_static_info("txgw=%s", gw.c_str());
if (gw == "off") {
// just stop the multiplexer
mGwMultiplexer.Stop();
eos_static_info("Stopping transfer multiplexer");
}
if (gw == "on") {
mGwMultiplexer.Run();
eos_static_info("Starting transfer multiplexer");
}
return;
}
if (key == "gw.rate") {
// modify the rate settings of the gw multiplexer
std::string rate = tmpValue;
eos_static_info("cmd=set gw.rate=%s", rate.c_str());
mGwMultiplexer.SetBandwidth(atoi(rate.c_str()));
return;
}
if (key == "gw.ntx") {
// modify the parallel transfer settings of the gw multiplexer
std::string ntx = tmpValue;
eos_static_info("cmd=set gw.ntx=%s", ntx.c_str());
mGwMultiplexer.SetSlots(atoi(ntx.c_str()));
return;
}
if (key == "error.simulation") {
std::string value = tmpValue;
eos_static_info("cmd=set error.simulation=%s", tmpValue.c_str());
gOFS.SetSimulationError(tmpValue.c_str());
return;
}
}
//------------------------------------------------------------------------------
// Communicator
//------------------------------------------------------------------------------
......@@ -198,144 +324,12 @@ Storage::Communicator(ThreadAssistant& assistant)
queue.erase(dpos);
}
if (queue == Config::gConfig.getFstNodeConfigQueue("communicator", false)) {
if (key == "symkey") {
gOFS.ObjectManager.HashMutex.LockRead();
// we received a new symkey
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string symkey = hash->Get("symkey");
eos_static_info("symkey=%s", symkey.c_str());
eos::common::gSymKeyStore.SetKey64(symkey.c_str(), 0);
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
std::string tmpValue;
if (key == "manager") {
gOFS.ObjectManager.HashMutex.LockRead();
// we received a manager
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string manager = hash->Get("manager");
eos_static_info("manager=%s", manager.c_str());
XrdSysMutexHelper lock(Config::gConfig.Mutex);
Config::gConfig.Manager = manager.c_str();
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
if (key == "publish.interval") {
gOFS.ObjectManager.HashMutex.LockRead();
// we received a manager
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string publishinterval = hash->Get("publish.interval");
eos_static_info("publish.interval=%s", publishinterval.c_str());
XrdSysMutexHelper lock(Config::gConfig.Mutex);
Config::gConfig.PublishInterval = atoi(publishinterval.c_str());
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
if (key == "debug.level") {
gOFS.ObjectManager.HashMutex.LockRead();
// we received a manager
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string debuglevel = hash->Get("debug.level");
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
int debugval = g_logging.GetPriorityByString(debuglevel.c_str());
if (debugval < 0) {
eos_static_err("debug level %s is not known!", debuglevel.c_str());
} else {
// we set the shared hash debug for the lowest 'debug' level
if (debuglevel == "debug") {
gOFS.ObjectManager.SetDebug(true);
} else {
gOFS.ObjectManager.SetDebug(false);
}
g_logging.SetLogPriority(debugval);
}
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
// creation/deletion of gateway transfer queue
if (key == "txgw") {
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string gw = hash->Get("txgw");
eos_static_info("txgw=%s", gw.c_str());
gOFS.ObjectManager.HashMutex.UnLockRead();
if (gw == "off") {
// just stop the multiplexer
mGwMultiplexer.Stop();
eos_static_info("Stopping transfer multiplexer on %s", queue.c_str());
}
if (gw == "on") {
mGwMultiplexer.Run();
eos_static_info("Starting transfer multiplexer on %s", queue.c_str());
}
} else {
gOFS.ObjectManager.HashMutex.UnLockRead();
eos_static_warning("Cannot get hash(queue)");
}
}
if (key == "gw.rate") {
// modify the rate settings of the gw multiplexer
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string rate = hash->Get("gw.rate");
eos_static_info("cmd=set gw.rate=%s", rate.c_str());
mGwMultiplexer.SetBandwidth(atoi(rate.c_str()));
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
if (key == "gw.ntx") {
// modify the parallel transfer settings of the gw multiplexer
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string ntx = hash->Get("gw.ntx");
eos_static_info("cmd=set gw.ntx=%s", ntx.c_str());
mGwMultiplexer.SetSlots(atoi(ntx.c_str()));
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
if (queue == Config::gConfig.getFstNodeConfigQueue("communicator", false)) {
processIncomingFstConfigurationChange(key.c_str());
if (key == "error.simulation") {
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* hash = gOFS.ObjectManager.GetObject(queue.c_str(), "hash");
if (hash) {
std::string value = hash->Get("error.simulation");
eos_static_info("cmd=set error.simulation=%s", value.c_str());
gOFS.SetSimulationError(value.c_str());
}
gOFS.ObjectManager.HashMutex.UnLockRead();
}
} else {
mFsMutex.LockRead();
......
......@@ -37,12 +37,11 @@ Storage::GetDrainSlotVariables(unsigned long long& nparalleltx,
unsigned long long& ratetx,
std::string nodeconfigqueue)
{
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* confighash = gOFS.ObjectManager.GetHash(
nodeconfigqueue.c_str());
std::string manager = confighash ? confighash->Get("manager") : "unknown";
nparalleltx = confighash ? confighash->GetLongLong("stat.drain.ntx") : 0;
ratetx = confighash ? confighash->GetLongLong("stat.drain.rate") : 0;
nparalleltx = 0;
ratetx = 0;
getFSTConfigValue("stat.drain.ntx", nparalleltx);
getFSTConfigValue("stat.drain.rate", ratetx);
if (nparalleltx == 0) {
nparalleltx = 0;
......@@ -52,11 +51,9 @@ Storage::GetDrainSlotVariables(unsigned long long& nparalleltx,
ratetx = 25;
}
eos_static_debug("manager=%s nparalleltransfers=%llu transferrate=%llu",
manager.c_str(),
eos_static_debug("nparalleltransfers=%llu transferrate=%llu",
nparalleltx,
ratetx);
gOFS.ObjectManager.HashMutex.UnLockRead();
}
//------------------------------------------------------------------------------
......
......@@ -85,14 +85,6 @@ Storage::Remover()
// Ask to schedule deletions regularly (default is every 5 minutes)
if ((now - lastAskedForDeletions) > deletionInterval) {
// get some global variables
gOFS.ObjectManager.HashMutex.LockRead();
XrdMqSharedHash* confighash = gOFS.ObjectManager.GetHash(
nodeconfigqueue.c_str());
std::string manager = confighash ? confighash->Get("manager") : "unknown";
eos_static_debug("manager=%s", manager.c_str());
gOFS.ObjectManager.HashMutex.UnLockRead();
// ---------------------------------------
lastAskedForDeletions = now;
eos_static_debug("asking for new deletions");
XrdOucString managerQuery = "/?";
......
......@@ -1008,10 +1008,8 @@ Storage::CheckLabel(std::string path,
bool
Storage::IsNodeActive() const
{
std::string cfg_node = Config::gConfig.getFstNodeConfigQueue().c_str();
eos::common::RWMutexReadLock rd_lock(gOFS.ObjectManager.HashMutex);
XrdMqSharedHash* hash = gOFS.ObjectManager.GetHash(cfg_node.c_str());
std::string status = hash->Get("stat.active");
std::string status;
getFSTConfigValue("stat.active", status);
if (status == "online") {
return true;
......
......@@ -196,7 +196,6 @@ private:
//----------------------------------------------------------------------------
static void* StartVarPartitionMonitor(void* pp);
static void* StartDaemonSupervisor(void* pp);
static void* StartFsCommunicator(void* pp);
static void* StartFsScrub(void* pp);
static void* StartFsTrim(void* pp);
static void* StartFsRemover(void* pp);
......@@ -233,6 +232,11 @@ private:
void Communicator(ThreadAssistant& assistant);
void QdbCommunicator(QdbContactDetails contactDetails,
ThreadAssistant& assistant);
bool getFSTConfigValue(const std::string &key, std::string &value) const;
bool getFSTConfigValue(const std::string &key, unsigned long long &value);
void processIncomingFstConfigurationChange(const std::string &key);
void Scrub();
void Trim();
void Remover();
......
......@@ -971,8 +971,11 @@ EosFuse::run(int argc, char* argv[], void* userdata)
char spid[16];
snprintf(spid, sizeof(spid), "%d", getpid());
config.mqidentity += spid;
config.mdcachedir += "/";
config.mdcachedir += suuid;
if (config.mdcachedir.length()) {
config.mdcachedir += "/";
config.mdcachedir += suuid;
}
}
if (config.options.fdlimit > 0) {
......
......@@ -32,7 +32,7 @@
#-------------------------------------------------------------------------------
Summary: gRPC, A high performance, open-source universal RPC framework
Name: grpc
Version: 1.15.0
Version: 1.19.0
Release: 1%{?dist}
License: BSD
URL: http://www.grpc.io/
......
......@@ -1521,6 +1521,9 @@ Server::OpSetDirectory(const std::string& id,
cmd->setCTime(ctime);
cmd->setMTime(mtime);
// propagate mtime changes
cmd->notifyMTimeChange(gOFS->eosDirectoryService);
for (auto it = md.attr().begin(); it != md.attr().end(); ++it) {
if ((it->first.substr(0, 3) != "sys") ||
(it->first == "sys.eos.btime")) {
......
......@@ -27,6 +27,7 @@
#include "mgm/XrdMgmOfs.hh"
#include "mgm/TableFormatter/TableFormatterBase.hh"
#include "common/FileSystem.hh"
#include "common/IntervalStopwatch.hh"
#include <iostream>
#include <fstream>
#include <sstream>
......@@ -50,7 +51,6 @@ const size_t GeoTreeEngine::gGeoBufferSize = sizeof(FastPlacementTree) +
FastPlacementTree::sGetMaxDataMemSize();
thread_local void* GeoTreeEngine::tlGeoBuffer = NULL;
pthread_key_t GeoTreeEngine::gPthreadKey;
thread_local const FsGroup* GeoTreeEngine::tlCurrentGroup = NULL;
const int GeoTreeEngine::sfgId = 1;
const int GeoTreeEngine::sfgHost = 1 << 1;
......@@ -1220,14 +1220,12 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
const std::string& clientGeoTag,
const size_t& nCollocatedReplicas,
vector<FileSystem::fsid_t>* excludeFs,
vector<string>* excludeGeoTags,
vector<string>* forceGeoTags)
vector<string>* excludeGeoTags)
{
assert(nNewReplicas);
assert(newReplicas);
std::vector<SchedTME*> entries;
// find the entry in the map
tlCurrentGroup = group;
SchedTME* entry;
{
RWMutexReadLock lock(this->pTreeMapMutex);
......@@ -1244,7 +1242,7 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
entry->doubleBufferMutex.LockRead();
// locate the existing replicas and the excluded fs in the tree
vector<SchedTreeBase::tFastTreeIdx> newReplicasIdx(nNewReplicas),
*existingReplicasIdx = NULL, *excludeFsIdx = NULL, *forceBrIdx = NULL;
*existingReplicasIdx = NULL, *excludeFsIdx = NULL;
newReplicasIdx.resize(0);
if (existingReplicas) {
......@@ -1324,17 +1322,6 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
}
}
if (forceGeoTags) {
forceBrIdx = new vector<SchedTreeBase::tFastTreeIdx>(forceGeoTags->size());
for (auto it = forceGeoTags->begin(); it != forceGeoTags->end(); ++it) {
SchedTreeBase::tFastTreeIdx idx;
idx = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode(
it->c_str());
forceBrIdx->push_back(idx);
}
}
SchedTreeBase::tFastTreeIdx startFromNode = 0;
if (!startFromGeoTag.empty()) {
......@@ -1356,7 +1343,7 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
success = placeNewReplicas(entry, nNewReplicas, &newReplicasIdx,
entry->foregroundFastStruct->placementTree,
existingReplicasIdx, bookingSize, startFromNode,
nCollocatedReplicas, excludeFsIdx, forceBrIdx,
nCollocatedReplicas, excludeFsIdx,
pSkipSaturatedPlct);
break;
......@@ -1364,7 +1351,7 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
success = placeNewReplicas(entry, nNewReplicas, &newReplicasIdx,
entry->foregroundFastStruct->drnPlacementTree,
existingReplicasIdx, bookingSize, startFromNode,
nCollocatedReplicas, excludeFsIdx, forceBrIdx,
nCollocatedReplicas, excludeFsIdx,
pSkipSaturatedDrnPlct);
break;
......@@ -1372,7 +1359,7 @@ GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group,
success = placeNewReplicas(entry, nNewReplicas, &newReplicasIdx,
entry->foregroundFastStruct->blcPlacementTree,
existingReplicasIdx, bookingSize, startFromNode,
nCollocatedReplicas, excludeFsIdx, forceBrIdx,
nCollocatedReplicas, excludeFsIdx,
pSkipSaturatedBlcPlct);
break;
......@@ -1492,10 +1479,6 @@ cleanup:
delete excludeFsIdx;
}
if (forceBrIdx) {
delete forceBrIdx;
}
return success;
}
......@@ -2214,32 +2197,18 @@ cleanup:
return returnCode;
}
bool GeoTreeEngine::StartUpdater()
void GeoTreeEngine::StartUpdater()
{
if (XrdSysThread::Run(&pUpdaterTid, GeoTreeEngine::startFsChangeListener,
static_cast<void*>(this),
XRDSYSTHREAD_HOLD, "GeoTreeEngine Updater")) {
return false;
}
return true;
updaterThread.reset(&GeoTreeEngine::listenFsChange, this);
}
bool GeoTreeEngine::StopUpdater()
void GeoTreeEngine::StopUpdater()
{
XrdSysThread::Cancel(pUpdaterTid);
XrdSysThread::Join(pUpdaterTid, 0);
updaterThread.join();
gUpdaterStarted = false;
return true;
}
void* GeoTreeEngine::startFsChangeListener(void* pp)
{
((GeoTreeEngine*)pp)->listenFsChange();
return 0;
}
void GeoTreeEngine::listenFsChange()
void GeoTreeEngine::listenFsChange(ThreadAssistant &assistant)
{
gUpdaterStarted = true;
gOFS->ObjectNotifier.BindCurrentThread("geotreeengine");
......@@ -2250,13 +2219,7 @@ void GeoTreeEngine::listenFsChange()
eos_info("GeoTreeEngine updater is starting...");
}
struct timeval curtime, prevtime;
gettimeofday(&prevtime, NULL);
curtime = prevtime;
do {
while(!assistant.terminationRequested()) {
while (sem_wait(&gUpdaterPauseSem)) {
if (EINTR != errno) {
throw "sem_wait() failed";
......@@ -2264,7 +2227,7 @@ void GeoTreeEngine::listenFsChange()
}
gOFS->ObjectNotifier.tlSubscriber->mSubjSem.Wait(1);
XrdSysThread::SetCancelOff();
// to be sure that we won't try to access a removed fs
pAddRmFsMutex.LockWrite();
// we always take a lock to take something from the queue and then release it
......@@ -2351,14 +2314,10 @@ void GeoTreeEngine::listenFsChange()
gOFS->ObjectNotifier.tlSubscriber->mSubjMtx.UnLock();
pAddRmFsMutex.UnLockWrite();
// do the processing
prevtime = curtime;
gettimeofday(&curtime, NULL);
eos_debug("Updating Fast Structures at %ds. %dns. Previous update was at "
"prev: %ds. %dns. Time elapsed since the last update is: %dms.",
(int)curtime.tv_sec, (int)curtime.tv_usec, (int)prevtime.tv_sec,
(int)prevtime.tv_usec, (int)curtime.tv_sec * 1000 + ((int)curtime.tv_usec) /
1000 - (int)prevtime.tv_sec * 1000 - ((int)prevtime.tv_usec) / 1000);
// Do the processing
common::IntervalStopwatch stopwatch((std::chrono::milliseconds(pTimeFrameDurationMs)));
{
// Do it before tree info to leave some time to the other threads
checkPendingDeletionsFs();
......@@ -2370,20 +2329,14 @@ void GeoTreeEngine::listenFsChange()
gNotificationsBufferFs.clear();
gNotificationsBufferProxy.clear();
}
XrdSysThread::SetCancelOn();
size_t elapsedMs = (curtime.tv_sec - prevtime.tv_sec) * 1000 +
(curtime.tv_usec - prevtime.tv_usec) / 1000;
pFrameCount++;
if (sem_post(&gUpdaterPauseSem)) {
throw "sem_post() failed";
}
if ((int)elapsedMs < pTimeFrameDurationMs) {
std::this_thread::sleep_for
(std::chrono::milliseconds(pTimeFrameDurationMs - (int)elapsedMs));
}
} while (1);
std::this_thread::sleep_for(stopwatch.timeRemainingInCycle());
}
}
bool GeoTreeEngine::updateTreeInfo(SchedTME* entry,
......
......@@ -1217,7 +1217,7 @@ protected:
// => background updating
//
/// thread ID of the dumper thread
pthread_t pUpdaterTid;
AssistedThread updaterThread;
/// maps a notification subject to changes that happened in the current time frame
static std::map<std::string, int>
gNotificationsBufferFs; /**< Shared object change notification for filesystems */
......@@ -1241,9 +1241,7 @@ protected:
void updateAtomicPenalties();
/// Trees update management
void listenFsChange();
static void* startFsChangeListener(void* pp);
void listenFsChange(ThreadAssistant &assistant);
/// Clean
void checkPendingDeletionsFs()
......@@ -1434,7 +1432,6 @@ protected:
const SchedTreeBase::tFastTreeIdx& startFromNode = 0,
const size_t& nFinalCollocatedReplicas = 0,
std::vector<SchedTreeBase::tFastTreeIdx>* excludedNodes = NULL,
std::vector<SchedTreeBase::tFastTreeIdx>* forceNodes = NULL,
bool skipSaturated = false)
{
// a read lock is supposed to be acquired on the fast structures
......@@ -1460,16 +1457,6 @@ protected:
T* tree = (T*)tlGeoBuffer;
if (forceNodes) {
///// ===== NOT IMPLEMENTED
assert(false);
// make all the nodes
for (SchedTreeBase::tFastTreeIdx k = 0; k < tree->getMaxNodeCount(); k++) {
tree->pNodes[k].fsData.mStatus &= ~SchedTreeBase::Available;
}
}
// place the existing replicas
size_t nAdjustCollocatedReplicas = nFinalCollocatedReplicas;
......@@ -1870,8 +1857,7 @@ public:
pAccessProxygroup("accessproxygroup"),
pCircSize(30), pFrameCount(0),
pPenaltySched(pCircSize),
pLatencySched(pCircSize),
pUpdaterTid(0)
pLatencySched(pCircSize)
{
// by default, disable all the placement operations for non geotagged fs
addDisabledBranch("*", "plct", "nogeotag", NULL, false);
......@@ -2042,8 +2028,7 @@ public:
const std::string& clientGeoTag = "",
const size_t& nCollocatedReplicas = 0,
std::vector<eos::common::FileSystem::fsid_t>* excludeFs = NULL,
std::vector<std::string>* excludeGeoTags = NULL,
std::vector<std::string>* forceGeoTags = NULL);
std::vector<std::string>* excludeGeoTags = NULL);
// this function to access replica spread across multiple scheduling group is a BACKCOMPATIBILITY artifact
// the new scheduler doesn't try to place files across multiple scheduling groups.
......@@ -2053,8 +2038,7 @@ public:
// SchedType type=regularRO,
// const std::string &accesserGeotag="",
// std::vector<eos::common::FileSystem::fsid_t> *excludeFs=NULL,
// std::vector<std::string> *excludeGeoTags=NULL,
// std::vector<std::string> *forceGeoTags=NULL);
// std::vector<std::string> *excludeGeoTags=NULL);
// ---------------------------------------------------------------------------
//! Access replicas across one or several scheduling group.
......@@ -2111,7 +2095,7 @@ public:
// @return
// true if success false else
// ---------------------------------------------------------------------------
bool StartUpdater();
void StartUpdater();
// ---------------------------------------------------------------------------
//! Pause the updating of the GeoTreeEngine but keep accumulating
......@@ -2181,7 +2165,7 @@ public:
// @return
// true if success false else
// ---------------------------------------------------------------------------
bool StopUpdater();
void StopUpdater();
// ---------------------------------------------------------------------------
//! Get the fs informations in the GeotreeEngine
......
......@@ -198,7 +198,6 @@ Scheduler::FilePlacement(PlacementArguments* args)
args->vid->geolocation,
ncollocatedfs,
NULL,
NULL,
NULL);
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
......
......@@ -1269,6 +1269,22 @@ public:
// ---------------------------------------------------------------------------
void FsConfigListener(ThreadAssistant& assistant) noexcept;
//----------------------------------------------------------------------------
// Get key from MGM config queue
//----------------------------------------------------------------------------
bool getMGMConfigValue(const std::string &key, std::string &value);
//----------------------------------------------------------------------------
// Process incoming MGM configuration change
//----------------------------------------------------------------------------
void processIncomingMgmConfigurationChange(const std::string &key);
//----------------------------------------------------------------------------
// Process geotag change on the specified filesystem
//----------------------------------------------------------------------------
void processGeotagChange(eos::common::FileSystem::fsid_t fsid,
const std::string &newgeotag);
//------------------------------------------------------------------------------
//! Add backup job to the queue to be picked up by the archive/backup submitter
//! thread.
......
This diff is collapsed.
......@@ -1236,18 +1236,11 @@ XrdMgmOfs::Configure(XrdSysError& Eroute)
g_logging.SetLogPriority(LOG_INFO);
g_logging.SetUnit(unit.c_str());
std::string filter =
"Process,AddQuota,Update,UpdateHint,UpdateQuotaStatus,SetConfigValue,"
"Deletion,GetQuota,PrintOut,RegisterNode,SharedHash,listenFsChange,"
"placeNewReplicas,placeNewReplicasOneGroup,accessReplicas,"
"accessReplicasOneGroup,accessHeadReplicaMultipleGroup,updateTreeInfo,"
"updateAtomicPenalties,updateFastStructures,work";
"Process,AddQuota,Update,UpdateHint,"
"Deletion,PrintOut,SharedHash,work";
g_logging.SetFilter(filter.c_str());
Eroute.Say("=====> setting message filter: Process,AddQuota,Update,UpdateHint,"
"UpdateQuotaStatus,SetConfigValue,Deletion,GetQuota,PrintOut,"
"RegisterNode,SharedHash,listenFsChange,placeNewReplicas,"
"placeNewReplicasOneGroup,accessReplicas,accessReplicasOneGroup,"
"accessHeadReplicaMultipleGroup,updateTreeInfo,"
"updateAtomicPenalties,updateFastStructures,work");
"Deletion,PrintOut,SharedHash,work");
// Setup configuration directory and start the config engine
if (!SetupConfigDir()) {
......
......@@ -545,8 +545,7 @@ DrainTransferJob::SelectDstFs(const FileDrainInfo& fdrain,
"",// client geo tag
ncollocatedfs,
&dst_exclude_fsids,
&fsid_geotags, // excludeGeoTags
NULL);
&fsid_geotags); // excludeGeoTags
if (!res || new_repl.empty()) {
eos_err("msg=\"fid=%08llx could not place new replica\"", mFileId);
......