diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 21d83edad22caca172189d4254ff49a32426a682..80e2925fc968e4b4d68ea337b4c6aa1b6715469a 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -14,6 +14,7 @@ - cta/CTA#634 - Fix crash of ctafrontend in initialisation for missing config values - cta/CTA#645 - Fix new mount timeout log message - cta/CTA#656 - Improve naming of taped's drive threads +- cta/CTA#616 - Fix CI errors during systemtests caused by SchedulerDB caching - cta/CTA#666 - Fix drive status activity field not being properly reset - cta/CTA#678 - Fix string representation for Cleanup session type - cta/CTA#682 - Generate taped's log file with correct owner and group diff --git a/continuousintegration/docker/ctafrontend/etc/cta/cta-frontend-xrootd.conf b/continuousintegration/docker/ctafrontend/etc/cta/cta-frontend-xrootd.conf index 3744620254c661956faa64a80bb7dc9af7eb9b79..454ad6e80c0efbbffa10541e94d5fd34f484f5b3 100644 --- a/continuousintegration/docker/ctafrontend/etc/cta/cta-frontend-xrootd.conf +++ b/continuousintegration/docker/ctafrontend/etc/cta/cta-frontend-xrootd.conf @@ -11,6 +11,10 @@ cta.schedulerdb.threadstacksize_mb 1 cta.schedulerdb.enable_repack_requests on cta.schedulerdb.enable_user_requests on +# CTA Scheduler DB options - Cache timeout options (decreased for tests) +cta.schedulerdb.tape_cache_max_age_secs 1 +cta.schedulerdb.retrieve_queue_cache_max_age_secs 1 + # CTA Catalogue options cta.catalogue.numberofconnections 10 diff --git a/continuousintegration/docker/ctafrontend/opt/run/bin/taped.sh b/continuousintegration/docker/ctafrontend/opt/run/bin/taped.sh index 11fc4a3950692c3a405029602c5435ad2edd3180..d535dff760b7a0ae7ba4a3e8145091fd53431fdb 100755 --- a/continuousintegration/docker/ctafrontend/opt/run/bin/taped.sh +++ b/continuousintegration/docker/ctafrontend/opt/run/bin/taped.sh @@ -42,19 +42,22 @@ echo ${DATABASEURL} > /etc/cta/cta-catalogue.conf TAPED_CONF_FILE="/etc/cta/cta-taped-${DRIVENAMES[${driveslot}]}.conf" # cta-taped setup - echo "taped BufferSizeBytes 262144" > "${TAPED_CONF_FILE}" - echo "taped BufferCount 200" >> "${TAPED_CONF_FILE}" - echo "taped MountCriteria 2000000, 100" >> "${TAPED_CONF_FILE}" - echo "taped WatchdogIdleSessionTimer 2" >> "${TAPED_CONF_FILE}" # Make tape servers more responsive, thus improving CI test speed - echo "ObjectStore BackendPath $OBJECTSTOREURL" >> "${TAPED_CONF_FILE}" - echo "taped UseEncryption no" >> "${TAPED_CONF_FILE}" - echo "taped DriveName ${DRIVENAMES[${driveslot}]}" >> "${TAPED_CONF_FILE}" - echo "taped DriveLogicalLibrary ${DRIVENAMES[${driveslot}]}" >> "${TAPED_CONF_FILE}" - echo "taped DriveDevice /dev/${DRIVEDEVICES[${driveslot}]}" >> "${TAPED_CONF_FILE}" - echo "taped DriveControlPath smc${driveslot}" >> "${TAPED_CONF_FILE}" - - echo "general InstanceName CI" >> "${TAPED_CONF_FILE}" - echo "general SchedulerBackendName VFS" >> "${TAPED_CONF_FILE}" +echo "taped BufferSizeBytes 262144" > "${TAPED_CONF_FILE}" +echo "taped BufferCount 200" >> "${TAPED_CONF_FILE}" +echo "taped MountCriteria 2000000, 100" >> "${TAPED_CONF_FILE}" +echo "taped WatchdogIdleSessionTimer 2" >> "${TAPED_CONF_FILE}" # Make tape servers more responsive, thus improving CI test speed +echo "ObjectStore BackendPath $OBJECTSTOREURL" >> "${TAPED_CONF_FILE}" +echo "taped UseEncryption no" >> "${TAPED_CONF_FILE}" +echo "taped DriveName ${DRIVENAMES[${driveslot}]}" >> "${TAPED_CONF_FILE}" +echo "taped DriveLogicalLibrary ${DRIVENAMES[${driveslot}]}" >> "${TAPED_CONF_FILE}" +echo "taped DriveDevice /dev/${DRIVEDEVICES[${driveslot}]}" >> "${TAPED_CONF_FILE}" +echo "taped DriveControlPath smc${driveslot}" >> "${TAPED_CONF_FILE}" +# Decrease schedulerDB cache timeout for tests +echo "taped TapeCacheMaxAgeSecs 1" >> "${TAPED_CONF_FILE}" +echo "taped RetrieveQueueCacheMaxAgeSecs 1" >> "${TAPED_CONF_FILE}" + +echo "general InstanceName CI" >> "${TAPED_CONF_FILE}" +echo "general SchedulerBackendName VFS" >> "${TAPED_CONF_FILE}" #### diff --git a/continuousintegration/orchestration/tests/retrieve_queue_cleanup.sh b/continuousintegration/orchestration/tests/retrieve_queue_cleanup.sh index 03b556cf8e537adce99823aabe75f2b2981aa599..799d8fe57c03aeee4f1df330683e38841e94656a 100755 --- a/continuousintegration/orchestration/tests/retrieve_queue_cleanup.sh +++ b/continuousintegration/orchestration/tests/retrieve_queue_cleanup.sh @@ -115,14 +115,14 @@ trigger_queue_cleanup() { for i in ${!tapeList[@]}; do wait_for_tape_state ${tapeList[$i]} BROKEN done - sleep 1 + sleep 1 # Wait for a bit, to take in account caching latencies for i in ${!tapeList[@]}; do admin_cta tape ch --vid ${tapeList[$i]} --state ACTIVE done for i in ${!tapeList[@]}; do wait_for_tape_state ${tapeList[$i]} ACTIVE done - sleep 1 + sleep 1 # Wait for a bit, to take in account caching latencies } wait_for_request_cancel_report() { @@ -167,9 +167,11 @@ change_tape_state() { then admin_cta tape ch --vid $VID --state REPACKING --reason "Testing" wait_for_tape_state $VID REPACKING + sleep 1 # Wait for a bit, to take in account caching latencies fi admin_cta tape ch --vid $VID --state $LAST_STATE --reason "Testing" wait_for_tape_state $VID $LAST_STATE + sleep 1 # Wait for a bit, to take in account caching latencies } @@ -381,9 +383,6 @@ test_tape_state_change_queue_preserved() { echo "Checking that the request was not modified on the queue..." - # Wait for a bit, to take in account protocol latencies - sleep 1 - QUERY_RSP=$(KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOS_INSTANCE} query prepare ${REQUEST_ID} ${FILE_PATH}) PATH_EXISTS=$(echo ${QUERY_RSP} | jq ".responses[] | select(.path == \"${FILE_PATH}\").path_exists") REQUESTED=$( echo ${QUERY_RSP} | jq ".responses[] | select(.path == \"${FILE_PATH}\").requested") @@ -506,13 +505,11 @@ test_tape_state_change_queue_moved() { if test "0" == "${EXPECTED_QUEUE_START}"; then echo "Changing $TAPE_1 queue to ${TAPE_1_STATE_END}..." change_tape_state $TAPE_1 $TAPE_1_STATE_END - sleep 1 echo "Changing $TAPE_0 queue to ${TAPE_0_STATE_END}..." change_tape_state $TAPE_0 $TAPE_0_STATE_END else echo "Changing $TAPE_0 queue to ${TAPE_0_STATE_END}..." change_tape_state $TAPE_0 $TAPE_0_STATE_END - sleep 1 echo "Changing $TAPE_1 queue to ${TAPE_1_STATE_END}..." change_tape_state $TAPE_1 $TAPE_1_STATE_END fi diff --git a/continuousintegration/orchestration/tests/test_client.sh b/continuousintegration/orchestration/tests/test_client.sh index 74b056b05550f03248a631cd9a1d69cc2430a0c1..8e52ff79696ea674b21d01d07a7146a14515cd2c 100755 --- a/continuousintegration/orchestration/tests/test_client.sh +++ b/continuousintegration/orchestration/tests/test_client.sh @@ -219,14 +219,13 @@ if [[ $EOS_V == 5 ]]; then fi -# TODO: Remove these comments once https://gitlab.cern.ch/cta/CTA/-/issues/616 is fixed -#setup_tapes_for_multicopy_test -# -#echo -#echo "Launching retrieve_queue_cleanup.sh on client pod" -#echo " Archiving file: xrdcp as user1" -#echo " Retrieving it as poweruser1" -#kubectl -n ${NAMESPACE} exec client -- bash /root/retrieve_queue_cleanup.sh || exit 1 -#kubectl -n ${NAMESPACE} exec ctaeos -- bash /root/grep_xrdlog_mgm_for_error.sh || exit 1 +setup_tapes_for_multicopy_test + +echo +echo "Launching retrieve_queue_cleanup.sh on client pod" +echo " Archiving file: xrdcp as user1" +echo " Retrieving it as poweruser1" +kubectl -n ${NAMESPACE} exec client -- bash /root/retrieve_queue_cleanup.sh || exit 1 +kubectl -n ${NAMESPACE} exec ctaeos -- bash /root/grep_xrdlog_mgm_for_error.sh || exit 1 exit 0 diff --git a/frontend-grpc/Main.cpp b/frontend-grpc/Main.cpp index 9c75b969dd9a8009fa414aa5c5a267ec19fffbb6..e89ba2a0fe111f0d21a1ad442e10ff2299c9ddac 100644 --- a/frontend-grpc/Main.cpp +++ b/frontend-grpc/Main.cpp @@ -152,6 +152,20 @@ int main(const int argc, char *const *const argv) { auto backed = config.getConfEntString("ObjectStore", "BackendPath"); lc.log(log::INFO, "Using scheduler backend: " + backed); + // Check the scheduler DB cache timeout values + try { + auto tapeCacheMaxAgeSecs = config.getConfEntString("SchedulerDB", "TapeCacheMaxAgeSecs"); + log::ScopedParamContainer params(lc); + params.add("tapeCacheMaxAgeSecs", tapeCacheMaxAgeSecs); + lc.log(log::INFO, "Using custom tape cache timout value"); + } catch(Configuration::NoEntry &ex) {} + try { + auto retrieveQueueCacheMaxAgeSecs = config.getConfEntString("SchedulerDB", "RetrieveQueueCacheMaxAgeSecs"); + log::ScopedParamContainer params(lc); + params.add("retrieveQueueCacheMaxAgeSecs", retrieveQueueCacheMaxAgeSecs); + lc.log(log::INFO, "Using custom retrieve queue cache timout value"); + } catch(Configuration::NoEntry &ex) {} + auto sInit = std::make_unique<SchedulerDBInit_t>("Frontend", backed, logger); auto scheddb = sInit->getSchedDB(*catalogue, logger); scheddb->initConfig(std::nullopt, std::nullopt); diff --git a/frontend/common/FrontendService.cpp b/frontend/common/FrontendService.cpp index 90d319f123bb3e84efb3e798c5c8e63c4b31d0b5..c9182f59b6c9a04d64bf98370960cb9df75d4697 100644 --- a/frontend/common/FrontendService.cpp +++ b/frontend/common/FrontendService.cpp @@ -146,6 +146,12 @@ FrontendService::FrontendService(const std::string& configFilename) : m_archiveF m_scheddbInit = std::make_unique<SchedulerDBInit_t>("Frontend", db_conn.value(), *m_log); m_scheddb = m_scheddbInit->getSchedDB(*m_catalogue, *m_log); + // Set Scheduler DB cache timeouts + SchedulerDatabase::StatisticsCacheConfig statisticsCacheConfig; + statisticsCacheConfig.tapeCacheMaxAgeSecs = m_tapeCacheMaxAgeSecs; + statisticsCacheConfig.retrieveQueueCacheMaxAgeSecs = m_retrieveQueueCacheMaxAgeSecs; + m_scheddb->setStatisticsCacheConfig(statisticsCacheConfig); + /** [[OStoreDB specific]] * The osThreadStackSize and osThreadPoolSize variables * shall be removed once we decommission OStoreDB @@ -266,6 +272,32 @@ FrontendService::FrontendService(const std::string& configFilename) : m_archiveF log(log::INFO, "Configuration entry", params); } + { + auto tapeCacheMaxAgeSecsConf = config.getOptionValueUInt("cta.schedulerdb.tape_cache_max_age_secs"); + if(tapeCacheMaxAgeSecsConf.has_value()) { + m_tapeCacheMaxAgeSecs = tapeCacheMaxAgeSecsConf.value(); + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.schedulerdb")); + params.push_back(log::Param("key", "tape_cache_max_age_secs")); + params.push_back(log::Param("value", tapeCacheMaxAgeSecsConf.value())); + log(log::INFO, "Configuration entry", params); + } + } + + { + auto retrieveQueueCacheMaxAgeSecsConf = config.getOptionValueUInt("cta.schedulerdb.retrieve_queue_cache_max_age_secs"); + if(retrieveQueueCacheMaxAgeSecsConf.has_value()) { + m_retrieveQueueCacheMaxAgeSecs = retrieveQueueCacheMaxAgeSecsConf.value(); + std::list<log::Param> params; + params.push_back(log::Param("source", configFilename)); + params.push_back(log::Param("category", "cta.schedulerdb")); + params.push_back(log::Param("key", "retrieve_queue_cache_max_age_secs")); + params.push_back(log::Param("value", retrieveQueueCacheMaxAgeSecsConf.value())); + log(log::INFO, "Configuration entry", params); + } + } + // Get the mount policy name for verification requests // All done diff --git a/frontend/common/FrontendService.hpp b/frontend/common/FrontendService.hpp index 3529032cd4494b26922b79d4ed5909b70f5ee5eb..91884ba938b90d24a63ed71c8cbfadf6f42df351 100644 --- a/frontend/common/FrontendService.hpp +++ b/frontend/common/FrontendService.hpp @@ -115,6 +115,8 @@ private: bool m_acceptRepackRequests; //!< Flag to allow the processing of repack requests bool m_acceptUserRequests; //!< Flag to allow the processing of user requests + std::optional<uint64_t> m_tapeCacheMaxAgeSecs; //!< Option to override the tape cache timeout value in the scheduler DB + std::optional<uint64_t> m_retrieveQueueCacheMaxAgeSecs; //!< Option to override the retrieve queue timeout value in the scheduler DB std::string m_catalogue_conn_string; //!< The catalogue connection string (without the password) uint64_t m_archiveFileMaxSize; //!< Maximum allowed file size for archive requests std::optional<std::string> m_repackBufferURL; //!< The repack buffer URL diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 8cc2bdc3bab452700040e96a74c057ea936a7346..e27fbb4d8742c89af68b12d1ead6bd33d399aa66 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -34,6 +34,10 @@ namespace cta::objectstore { +/** Time between cache updates */ +time_t Helpers::g_tapeCacheMaxAge = 600; // Default 10 minutes +time_t Helpers::g_retrieveQueueCacheMaxAge = 10; // Default 10 seconds + //------------------------------------------------------------------------------ // Helpers::getLockedAndFetchedQueue <ArchiveQueue> () //------------------------------------------------------------------------------ @@ -377,7 +381,7 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string, std::le for(auto& v : candidateVids) { // throw std::out_of_range() if cache item not found or if it is stale auto timeSinceLastUpdate = time(nullptr) - g_tapeStatuses.at(v).updateTime; - if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + if(timeSinceLastUpdate >= g_tapeCacheMaxAge) { throw std::out_of_range(""); } } @@ -385,7 +389,7 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string, std::le // Remove stale cache entries for(auto it = g_tapeStatuses.cbegin(); it != g_tapeStatuses.cend(); ) { auto timeSinceLastUpdate = time(nullptr) - it->second.updateTime; - if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + if(timeSinceLastUpdate >= g_tapeCacheMaxAge) { it = g_tapeStatuses.erase(it); } else { ++it; @@ -423,14 +427,16 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string, std::le } else { // We have a cache hit, check it's not stale. time_t timeSinceLastUpdate = time(nullptr) - g_retrieveQueueStatistics.at(v).updateTime; - if (timeSinceLastUpdate > c_retrieveQueueCacheMaxAge){ - logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+")> c_retrieveQueueCacheMaxAge (" - +std::to_string(c_retrieveQueueCacheMaxAge)+"), cache needs to be updated"); + if (timeSinceLastUpdate >= g_retrieveQueueCacheMaxAge) { + if (g_retrieveQueueCacheMaxAge) { + logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + ")> g_retrieveQueueCacheMaxAge (" + + std::to_string(g_retrieveQueueCacheMaxAge) + "), cache needs to be updated"); + } throw std::out_of_range(""); } - logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"Cache is not updated, timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+ - ") <= c_retrieveQueueCacheMaxAge ("+std::to_string(c_retrieveQueueCacheMaxAge)+")"); + logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "Cache is not updated, timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + + ") <= g_retrieveQueueCacheMaxAge (" + std::to_string(g_retrieveQueueCacheMaxAge) + ")"); // We're lucky: cache hit (and not stale) if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING && isRepack)) { @@ -551,13 +557,13 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ } } -void Helpers::flushRetrieveQueueStatisticsCache(){ +void Helpers::flushStatisticsCache(){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.clear(); g_tapeStatuses.clear(); } -void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){ +void Helpers::flushStatisticsCacheForVid(const std::string & vid){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.erase(vid); g_tapeStatuses.erase(vid); @@ -619,6 +625,20 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS return ret; } +//------------------------------------------------------------------------------ +// Helpers::setTapeCacheMaxAgeSecs() +//------------------------------------------------------------------------------ +void Helpers::setTapeCacheMaxAgeSecs(int cacheMaxAgeSecs) { + g_tapeCacheMaxAge = cacheMaxAgeSecs; +} + +//------------------------------------------------------------------------------ +// Helpers::setRetrieveQueueCacheMaxAgeSecs() +//------------------------------------------------------------------------------ +void Helpers::setRetrieveQueueCacheMaxAgeSecs(int cacheMaxAgeSecs) { + g_retrieveQueueCacheMaxAge = cacheMaxAgeSecs; +} + //------------------------------------------------------------------------------ // Helpers::registerRepackRequestToIndex() //------------------------------------------------------------------------------ diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp index 1497b3a59a7c928b01ff8de6ea97d58368310b38..d82904aa890ef2dbf91855dc7b1d0cf5358eab30 100644 --- a/objectstore/Helpers.hpp +++ b/objectstore/Helpers.hpp @@ -110,13 +110,21 @@ class Helpers { static void updateRetrieveQueueStatisticsCache(const std::string & vid, uint64_t files, uint64_t bytes, uint64_t priority); /** - * Allows to flush the RetrieveQueueStatisticsCache - * TO BE USED BY UNIT TESTS ! + * Allows to flush the statistics caches + * Required by the unit tests */ - static void flushRetrieveQueueStatisticsCache(); - static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid); + static void flushStatisticsCache(); + static void flushStatisticsCacheForVid(const std::string & vid); - private: + /** + * Helper function to set the time between cache updates. + * Set to zero to disable entirely the caching capabilities. + * Useful to reducing the value during tests. + */ + static void setTapeCacheMaxAgeSecs(int cacheMaxAgeSecs); + static void setRetrieveQueueCacheMaxAgeSecs(int cacheMaxAgeSecs); + +private: /** A struct holding together tape statistics and an update time */ struct TapeStatusWithTime { common::dataStructures::Tape tapeStatus; @@ -139,8 +147,8 @@ class Helpers { /** The stats for the queues */ static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics; /** Time between cache updates */ - static const time_t c_tapeCacheMaxAge = 600; - static const time_t c_retrieveQueueCacheMaxAge = 10; + static time_t g_tapeCacheMaxAge; + static time_t g_retrieveQueueCacheMaxAge; static void logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic, const std::string& message = ""); diff --git a/objectstore/ObjectStoreFixture.cpp b/objectstore/ObjectStoreFixture.cpp index 7a5cbf46e31f4fc957c5dcd63a00f70d8e966125..97d2239217bf536bd2f16ce3871008c17be4da41 100644 --- a/objectstore/ObjectStoreFixture.cpp +++ b/objectstore/ObjectStoreFixture.cpp @@ -23,7 +23,7 @@ namespace unitTests { void ObjectStore::SetUp() { // We need to cleanup the queue statistics cache before every test - cta::objectstore::Helpers::flushRetrieveQueueStatisticsCache(); + cta::objectstore::Helpers::flushStatisticsCache(); } } diff --git a/objectstore/QueueCleanupRunner.cpp b/objectstore/QueueCleanupRunner.cpp index ca0314b0a331ee5129f4e3870d6be79b1f9fc22d..b3af7494a2c91f54d946f975bda282a3151ef537 100644 --- a/objectstore/QueueCleanupRunner.cpp +++ b/objectstore/QueueCleanupRunner.cpp @@ -199,15 +199,15 @@ void QueueCleanupRunner::runOnePass(log::LogContext &logContext) { switch (tapeDataRefreshed.state) { case common::dataStructures::Tape::REPACKING_PENDING: m_catalogue.Tape()->modifyTapeState(admin, queueVid, common::dataStructures::Tape::REPACKING, common::dataStructures::Tape::REPACKING_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to REPACKING")); - m_db.clearRetrieveQueueStatisticsCache(queueVid); + m_db.clearStatisticsCache(queueVid); break; case common::dataStructures::Tape::BROKEN_PENDING: m_catalogue.Tape()->modifyTapeState(admin, queueVid, common::dataStructures::Tape::BROKEN, common::dataStructures::Tape::BROKEN_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to BROKEN")); - m_db.clearRetrieveQueueStatisticsCache(queueVid); + m_db.clearStatisticsCache(queueVid); break; case common::dataStructures::Tape::EXPORTED_PENDING: m_catalogue.Tape()->modifyTapeState(admin, queueVid, common::dataStructures::Tape::EXPORTED, common::dataStructures::Tape::EXPORTED_PENDING, prevReason.value_or("QueueCleanupRunner: changed tape state to EXPORTED")); - m_db.clearRetrieveQueueStatisticsCache(queueVid); + m_db.clearStatisticsCache(queueVid); break; default: log::ScopedParamContainer paramsWarnMsg(logContext); diff --git a/objectstore/QueueCleanupRunnerConcurrentTest.cpp b/objectstore/QueueCleanupRunnerConcurrentTest.cpp index 7e8e80290835022cfcd208decde9e22236d9955c..d3004cb8bed60c7be4496c1bd6fdb0273f5baf1f 100644 --- a/objectstore/QueueCleanupRunnerConcurrentTest.cpp +++ b/objectstore/QueueCleanupRunnerConcurrentTest.cpp @@ -152,7 +152,7 @@ public: } virtual void TearDown() { - cta::objectstore::Helpers::flushRetrieveQueueStatisticsCache(); + cta::objectstore::Helpers::flushStatisticsCache(); m_scheduler.reset(); m_db.reset(); m_catalogue.reset(); diff --git a/objectstore/QueueCleanupRunnerTest.cpp b/objectstore/QueueCleanupRunnerTest.cpp index bce276abf5524d1ea8373ea457fe311c7135f865..5d63a34959dfd1c5323a624c2031b51869b2299d 100644 --- a/objectstore/QueueCleanupRunnerTest.cpp +++ b/objectstore/QueueCleanupRunnerTest.cpp @@ -145,7 +145,7 @@ public: } virtual void TearDown() { - cta::objectstore::Helpers::flushRetrieveQueueStatisticsCache(); + cta::objectstore::Helpers::flushStatisticsCache(); m_scheduler.reset(); m_db.reset(); m_catalogue.reset(); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 61a43666199edbf5b73b5d290d98fc6c93227fff..d24533549d930066c214ea5a9e76625395bed7cd 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1423,10 +1423,22 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue } //------------------------------------------------------------------------------ -// OStoreDB::clearRetrieveQueueStatisticsCache() +// OStoreDB::clearStatisticsCache() //------------------------------------------------------------------------------ -void OStoreDB::clearRetrieveQueueStatisticsCache(const std::string& vid) { - return Helpers::flushRetrieveQueueStatisticsCacheForVid(vid); +void OStoreDB::clearStatisticsCache(const std::string& vid) { + return Helpers::flushStatisticsCacheForVid(vid); +} + +//------------------------------------------------------------------------------ +// OStoreDB::setStatisticsCacheConfig() +//------------------------------------------------------------------------------ +void OStoreDB::setStatisticsCacheConfig(const StatisticsCacheConfig & conf) { + if (conf.retrieveQueueCacheMaxAgeSecs.has_value()) { + Helpers::setRetrieveQueueCacheMaxAgeSecs(conf.retrieveQueueCacheMaxAgeSecs.value()); + } + if (conf.tapeCacheMaxAgeSecs.has_value()) { + Helpers::setTapeCacheMaxAgeSecs(conf.tapeCacheMaxAgeSecs.value()); + } } //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 52531a0a4f40508ce161c7dda036552f6d123125..392ccfc4ac4cbc7c2724d6106fed333006524e41 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -433,7 +433,9 @@ class OStoreDB: public SchedulerDatabase { std::list<RetrieveQueueStatistics> getRetrieveQueueStatistics( const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider) override; - void clearRetrieveQueueStatisticsCache(const std::string& vid) override; + void clearStatisticsCache(const std::string& vid) override; + + void setStatisticsCacheConfig(const StatisticsCacheConfig & conf) override; CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 5bdedae8c473b66363f4a05a690a9d0bb5d74348..19b4c16a9aea090c8d657afc69b824bc0ad23095 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -2141,7 +2141,7 @@ void Scheduler::triggerTapeStateChange(const common::dataStructures::SecurityIde throw cta::exception::UserError("Unknown procedure to change tape state to " + Tape::stateToString(new_state)); } - m_db.clearRetrieveQueueStatisticsCache(vid); + m_db.clearStatisticsCache(vid); } //------------------------------------------------------------------------------ diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 8ae1976d80246023466e6272f5c33a82166dbe3e..44cd52d28eb2737b2656a0fcdda5ad18bf87ba0e 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -340,22 +340,32 @@ class SchedulerDatabase { const std::set<std::string> & vidsToConsider) = 0; /** - * Clear the retrieve queue statistics cache. - * @param vid the queue vid + * Clear the queue/tape statistics cache. + * @param vid the queue/tape vid */ - virtual void clearRetrieveQueueStatisticsCache(const std::string & vid) = 0; + virtual void clearStatisticsCache(const std::string & vid) = 0; - /** - * Queues the specified request. As the object store has access to the catalogue, - * the best queue (most likely to go, and not disabled can be chosen directly there). - * - * @param rqst The request. - * @param criteria The criteria retrieved from the CTA catalogue to be used to - * decide how to quue the request. - * @param diskSystemName optional disk system name if the destination matches a declared one. - * @param logContext context allowing logging db operation - * @return the selected vid (mostly for logging) - */ + struct StatisticsCacheConfig { + std::optional<time_t> tapeCacheMaxAgeSecs; + std::optional<time_t> retrieveQueueCacheMaxAgeSecs; + }; + + /** + * Configure the statistics cache + */ + virtual void setStatisticsCacheConfig(const StatisticsCacheConfig & conf) = 0; + + /** + * Queues the specified request. As the object store has access to the catalogue, + * the best queue (most likely to go, and not disabled can be chosen directly there). + * + * @param rqst The request. + * @param criteria The criteria retrieved from the CTA catalogue to be used to + * decide how to quue the request. + * @param diskSystemName optional disk system name if the destination matches a declared one. + * @param logContext context allowing logging db operation + * @return the selected vid (mostly for logging) + */ struct RetrieveRequestInfo { std::string selectedVid; std::string requestId; diff --git a/scheduler/SchedulerDatabaseFactory.hpp b/scheduler/SchedulerDatabaseFactory.hpp index c2b455a8c0ff8a733ae27b36b835a86501b7a145..a27455bab2fd2b2cc2b87df7eae8534c003eda6f 100644 --- a/scheduler/SchedulerDatabaseFactory.hpp +++ b/scheduler/SchedulerDatabaseFactory.hpp @@ -214,8 +214,12 @@ public: return m_SchedDB->getRetrieveQueueStatistics(criteria, vidsToConsider); } - void clearRetrieveQueueStatisticsCache(const std::string & vid) override { - return m_SchedDB->clearRetrieveQueueStatisticsCache(vid); + void clearStatisticsCache(const std::string & vid) override { + return m_SchedDB->clearStatisticsCache(vid); + } + + void setStatisticsCacheConfig(const StatisticsCacheConfig & conf) override { + return m_SchedDB->setStatisticsCacheConfig(conf); } SchedulerDatabase::RetrieveRequestInfo queueRetrieve(common::dataStructures::RetrieveRequest& rqst, diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index a9d9365c300d2965783c2882dc81f1bbd17450d3..bd313994e0ad39820f2a209065040d3abb56d99a 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -163,7 +163,7 @@ public: // We know the cast will not fail, so we can safely do it (otherwise we could leak memory) m_db.reset(dynamic_cast<cta::objectstore::OStoreDBWrapperInterface*>(osdb.release())); m_scheduler = std::make_unique<Scheduler>(*m_catalogue, *m_db, s_minFilesToWarrantAMount, s_minBytesToWarrantAMount); - objectstore::Helpers::flushRetrieveQueueStatisticsCache(); + objectstore::Helpers::flushStatisticsCache(); } virtual void TearDown() { diff --git a/scheduler/rdbms/Helpers.cpp b/scheduler/rdbms/Helpers.cpp index 60f8ab4c185ce7ddbfb024574302729eec6c897e..bd0acb58fe78a0de79d475bcb64c7afffc0f6c91 100644 --- a/scheduler/rdbms/Helpers.cpp +++ b/scheduler/rdbms/Helpers.cpp @@ -22,6 +22,10 @@ namespace cta::schedulerdb { +/** Time between cache updates */ +time_t Helpers::g_tapeCacheMaxAge = 600; // Default 10 minutes +time_t Helpers::g_retrieveQueueCacheMaxAge = 10; // Default 10 seconds + //------------------------------------------------------------------------------ // Helpers::g_tapeStatuses //------------------------------------------------------------------------------ @@ -59,7 +63,7 @@ std::string Helpers::selectBestVid4Retrieve for(auto& v : candidateVids) { // throw std::out_of_range() if cache item not found or if it is stale auto timeSinceLastUpdate = time(nullptr) - g_tapeStatuses.at(v).updateTime; - if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + if(timeSinceLastUpdate >= g_tapeCacheMaxAge) { throw std::out_of_range(""); } } @@ -67,7 +71,7 @@ std::string Helpers::selectBestVid4Retrieve // Remove stale cache entries for(auto it = g_tapeStatuses.cbegin(); it != g_tapeStatuses.cend(); ) { auto timeSinceLastUpdate = time(nullptr) - it->second.updateTime; - if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + if(timeSinceLastUpdate >= g_tapeCacheMaxAge) { it = g_tapeStatuses.erase(it); } else { ++it; @@ -112,16 +116,18 @@ std::string Helpers::selectBestVid4Retrieve } else { // We have a cache hit, check it's not stale. time_t timeSinceLastUpdate = time(nullptr) - g_retrieveQueueStatistics.at(v).updateTime; - if (timeSinceLastUpdate > c_retrieveQueueCacheMaxAge) { - logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), - "timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+")> c_retrieveQueueCacheMaxAge (" - + std::to_string(c_retrieveQueueCacheMaxAge)+"), cache needs to be updated"); + if (timeSinceLastUpdate >= g_retrieveQueueCacheMaxAge) { + if (g_retrieveQueueCacheMaxAge) { + logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), + "timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + ")> g_retrieveQueueCacheMaxAge (" + + std::to_string(g_retrieveQueueCacheMaxAge) + "), cache needs to be updated"); + } throw std::out_of_range(""); } logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), - "Cache is not updated, timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + - ") <= c_retrieveQueueCacheMaxAge (" + std::to_string(c_retrieveQueueCacheMaxAge) + ")"); + "Cache is not updated, timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + + ") <= g_retrieveQueueCacheMaxAge (" + std::to_string(g_retrieveQueueCacheMaxAge) + ")"); // We're lucky: cache hit (and not stale) if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) || @@ -287,6 +293,20 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS return ret; } +//------------------------------------------------------------------------------ +// Helpers::setTapeCacheMaxAgeSecs() +//------------------------------------------------------------------------------ +void Helpers::setTapeCacheMaxAgeSecs(int cacheMaxAgeSecs) { + g_tapeCacheMaxAge = cacheMaxAgeSecs; +} + +//------------------------------------------------------------------------------ +// Helpers::setRetrieveQueueCacheMaxAgeSecs() +//------------------------------------------------------------------------------ +void Helpers::setRetrieveQueueCacheMaxAgeSecs(int cacheMaxAgeSecs) { + g_retrieveQueueCacheMaxAge = cacheMaxAgeSecs; +} + //------------------------------------------------------------------------------ // Helpers::updateRetrieveQueueStatisticsCache() //------------------------------------------------------------------------------ @@ -320,7 +340,7 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ } } -void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){ +void Helpers::flushStatisticsCacheForVid(const std::string & vid){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.erase(vid); g_tapeStatuses.erase(vid); diff --git a/scheduler/rdbms/Helpers.hpp b/scheduler/rdbms/Helpers.hpp index 1ba526a1dfb73145c71c6c9a882ee18b2a3b7535..2aee792010d44a21556e3e5e1691d3c8d0f8f4f4 100644 --- a/scheduler/rdbms/Helpers.hpp +++ b/scheduler/rdbms/Helpers.hpp @@ -58,7 +58,10 @@ class Helpers { uint64_t bytes, uint64_t priority); - static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid); + static void flushStatisticsCacheForVid(const std::string & vid); + + static void setTapeCacheMaxAgeSecs(int cacheMaxAgeSecs); + static void setRetrieveQueueCacheMaxAgeSecs(int cacheMaxAgeSecs); /** A struct holding together RetrieveQueueStatistics, tape status and an update time. */ struct RetrieveQueueStatisticsWithTime { @@ -89,8 +92,8 @@ private: static std::map<std::string, RetrieveQueueStatisticsWithTime, std::less<>> g_retrieveQueueStatistics; /** Time between cache updates */ - static const time_t c_tapeCacheMaxAge = 600; - static const time_t c_retrieveQueueCacheMaxAge = 10; + static time_t g_tapeCacheMaxAge; + static time_t g_retrieveQueueCacheMaxAge; static void logUpdateCacheIfNeeded( const bool entryCreation, diff --git a/scheduler/rdbms/RelationalDB.cpp b/scheduler/rdbms/RelationalDB.cpp index 396f1431c0431f366c2925cfb1b0b7a3aac09770..2eb8aecdb4b5830d409694fe8de8bb3192aeb5db 100644 --- a/scheduler/rdbms/RelationalDB.cpp +++ b/scheduler/rdbms/RelationalDB.cpp @@ -229,9 +229,9 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> RelationalDB::getRetrieveQ throw cta::exception::Exception("Not implemented"); } -void RelationalDB::clearRetrieveQueueStatisticsCache(const std::string & vid) +void RelationalDB::clearStatisticsCache(const std::string & vid) { - schedulerdb::Helpers::flushRetrieveQueueStatisticsCacheForVid(vid); + schedulerdb::Helpers::flushStatisticsCacheForVid(vid); } SchedulerDatabase::RetrieveRequestInfo RelationalDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, diff --git a/scheduler/rdbms/RelationalDB.hpp b/scheduler/rdbms/RelationalDB.hpp index 72558af49fde07f36bb57d86775ed5bd65f9ca97..09ca6fd5052ac0b6791d5243cd616b08c362c3be 100644 --- a/scheduler/rdbms/RelationalDB.hpp +++ b/scheduler/rdbms/RelationalDB.hpp @@ -119,7 +119,9 @@ class RelationalDB: public SchedulerDatabase { const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const std::optional<std::string> diskSystemName, log::LogContext &logContext) override; - void clearRetrieveQueueStatisticsCache(const std::string & vid) override; + void clearStatisticsCache(const std::string & vid) override; + + void setStatisticsCacheConfig(const StatisticsCacheConfig & conf) override; void cancelRetrieve(const std::string& instanceName, const cta::common::dataStructures::CancelRetrieveRequest& rqst, log::LogContext& lc) override; diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 4aac6d84bde663fa66cde5551c7350439d946566..4e3c79445e055bb40c416105d9f6b3feebce594e 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -947,6 +947,13 @@ std::shared_ptr<cta::IScheduler> DriveHandler::createScheduler(const std::string "Reporting fatal error."); throw; } + + // Set Scheduler DB cache timeouts + SchedulerDatabase::StatisticsCacheConfig statisticsCacheConfig; + statisticsCacheConfig.tapeCacheMaxAgeSecs = m_tapedConfig.tapeCacheMaxAgeSecs.value(); + statisticsCacheConfig.retrieveQueueCacheMaxAgeSecs = m_tapedConfig.retrieveQueueCacheMaxAgeSecs.value(); + m_sched_db->setStatisticsCacheConfig(statisticsCacheConfig); + m_lc.log(log::DEBUG, "In DriveHandler::createScheduler(): will create scheduler."); return std::make_shared<Scheduler>(*m_catalogue, *m_sched_db, minFilesToWarrantAMount, minBytesToWarrantAMount); } diff --git a/tapeserver/daemon/MaintenanceHandler.cpp b/tapeserver/daemon/MaintenanceHandler.cpp index 13ed275fa1407593fff9285bffe4013c3787c972..2cd17396b7430bb6c666711da14775a51dca0b9a 100644 --- a/tapeserver/daemon/MaintenanceHandler.cpp +++ b/tapeserver/daemon/MaintenanceHandler.cpp @@ -277,6 +277,11 @@ void MaintenanceHandler::exceptionThrowingRunChild(){ catalogueLogin, nbConns, nbArchiveFileListingConns); catalogue = catalogueFactory->create(); sched_db = sched_db_init.getSchedDB(*catalogue, m_processManager.logContext().logger()); + // Set Scheduler DB cache timeouts + SchedulerDatabase::StatisticsCacheConfig statisticsCacheConfig; + statisticsCacheConfig.tapeCacheMaxAgeSecs = m_tapedConfig.tapeCacheMaxAgeSecs.value(); + statisticsCacheConfig.retrieveQueueCacheMaxAgeSecs = m_tapedConfig.retrieveQueueCacheMaxAgeSecs.value(); + sched_db->setStatisticsCacheConfig(statisticsCacheConfig); // TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them scheduler = std::make_unique<cta::Scheduler>(*catalogue, *sched_db, 5, 2*1000*1000); // Before launching the transfer session, we validate that the scheduler is reachable. diff --git a/tapeserver/daemon/common/TapedConfiguration.cpp b/tapeserver/daemon/common/TapedConfiguration.cpp index f28aa08d86fcf9b04d3b7e7de1e2d22ad8267bf4..32df7819c230937dc075884dfa6c3ea4286f859c 100644 --- a/tapeserver/daemon/common/TapedConfiguration.cpp +++ b/tapeserver/daemon/common/TapedConfiguration.cpp @@ -179,6 +179,10 @@ TapedConfiguration TapedConfiguration::createFromConfigPath( ret.instanceName.setFromConfigurationFile(cf, driveTapedConfigPath); ret.schedulerBackendName.setFromConfigurationFile(cf, driveTapedConfigPath); + // Caching options + ret.tapeCacheMaxAgeSecs.setFromConfigurationFile(cf, driveTapedConfigPath); + ret.retrieveQueueCacheMaxAgeSecs.setFromConfigurationFile(cf, driveTapedConfigPath); + // If we get here, the configuration file is good enough to be logged. ret.daemonUserName.log(log); ret.daemonGroupName.log(log); @@ -226,6 +230,9 @@ TapedConfiguration TapedConfiguration::createFromConfigPath( ret.instanceName.log(log); ret.schedulerBackendName.log(log); + ret.tapeCacheMaxAgeSecs.log(log); + ret.retrieveQueueCacheMaxAgeSecs.log(log); + return ret; } diff --git a/tapeserver/daemon/common/TapedConfiguration.hpp b/tapeserver/daemon/common/TapedConfiguration.hpp index 1d33f5d789f3bdb437c9e58b0a0a60060f908af4..931e658470f381ee767b1837d41fe7884d22f4c5 100644 --- a/tapeserver/daemon/common/TapedConfiguration.hpp +++ b/tapeserver/daemon/common/TapedConfiguration.hpp @@ -223,6 +223,17 @@ struct TapedConfiguration { "taped", "RmcRequestAttempts", 10, "Compile time default" }; + //---------------------------------------------------------------------------- + // Caching max age + //---------------------------------------------------------------------------- + cta::SourcedParameter<uint32_t> tapeCacheMaxAgeSecs { + "taped", "TapeCacheMaxAgeSecs", 600, "Compile time default" + }; + + cta::SourcedParameter<uint32_t> retrieveQueueCacheMaxAgeSecs { + "taped", "RetrieveQueueCacheMaxAgeSecs", 10, "Compile time default" + }; + //---------------------------------------------------------------------------- // Drive Options //---------------------------------------------------------------------------- diff --git a/tapeserver/daemon/cta-taped.conf.example b/tapeserver/daemon/cta-taped.conf.example index eaecf010bf04a3844e19e3da9d218508c2139ba4..9ac155f6de3097d2c2fefd06017f40ac57fc890d 100644 --- a/tapeserver/daemon/cta-taped.conf.example +++ b/tapeserver/daemon/cta-taped.conf.example @@ -113,6 +113,15 @@ ObjectStore BackendPath rados://cta@tapecta:cta # exceeded. Defaults to 500 GB and 10000 files. # taped MountCriteria 500000000000,10000 +# +# TAPED CACHE TIMEOUT OPTIONS +# +# Defaults to 600 seconds if not set. +# taped TapeCacheMaxAgeSecs 600 +# +# Defaults to 10 seconds if not set. +# taped RetrieveQueueCacheMaxAgeSecs 10 + # # DISK FILE ACCESS OPTIONS # diff --git a/xroot_plugins/cta-frontend-xrootd.conf.example b/xroot_plugins/cta-frontend-xrootd.conf.example index 34edd6d1a9d7ebd9b30b5a99e83fadc9fef37684..f3be2c455ecc156e32eef1f82d937a624071709d 100644 --- a/xroot_plugins/cta-frontend-xrootd.conf.example +++ b/xroot_plugins/cta-frontend-xrootd.conf.example @@ -5,13 +5,17 @@ # CTA ObjectStore options #cta.objectstore.backendpath /tmp/jobStoreXXXXXXX -# Block requests for user or repack +# CTA Scheduler DB - thread options +cta.schedulerdb.numberofthreads 500 +cta.schedulerdb.threadstacksize_mb 1 + +# CTA Scheduler DB - enable requests for user or repack cta.schedulerdb.enable_repack_requests on cta.schedulerdb.enable_user_requests on -# CTA Scheduler DB options -cta.schedulerdb.numberofthreads 500 -cta.schedulerdb.threadstacksize_mb 1 +# CTA Scheduler DB - cache timeout options +# cta.schedulerdb.tape_cache_max_age_secs 600 +# cta.schedulerdb.retrieve_queue_cache_max_age_secs 10 # CTA Catalogue options cta.catalogue.numberofconnections 10