From e8fe9461aa5355ee94d6e0bafc97d075b8cc20c8 Mon Sep 17 00:00:00 2001 From: Jaroslav Guenther <jaroslav.guenther@cern.ch> Date: Thu, 25 Apr 2024 22:46:49 +0200 Subject: [PATCH] Resolve "Fix archival job retrival and implement minimum set of methods to enable a file to be written to tape" --- ReleaseNotes.md | 1 + scheduler/ArchiveMount.cpp | 43 ++++--- scheduler/PostgresSchedDB/ArchiveJob.cpp | 2 + scheduler/PostgresSchedDB/ArchiveJob.hpp | 3 + scheduler/PostgresSchedDB/ArchiveMount.cpp | 118 +++++++++++++----- scheduler/PostgresSchedDB/ArchiveMount.hpp | 16 +-- scheduler/PostgresSchedDB/PostgresSchedDB.cpp | 9 +- scheduler/PostgresSchedDB/PostgresSchedDB.hpp | 19 +-- scheduler/PostgresSchedDB/RetrieveMount.cpp | 2 +- .../PostgresSchedDB/TapeMountDecisionInfo.cpp | 9 +- .../PostgresSchedDB/TapeMountDecisionInfo.hpp | 15 ++- .../PostgresSchedDB/sql/ArchiveJobQueue.cpp | 41 ++---- .../PostgresSchedDB/sql/ArchiveJobQueue.hpp | 12 +- .../PostgresSchedDB/sql/ArchiveJobSummary.hpp | 6 +- scheduler/PostgresSchedDB/sql/Mounts.hpp | 1 + .../PostgresSchedDB/sql/RetrieveJobQueue.cpp | 2 +- .../PostgresSchedDB/sql/RetrieveJobQueue.hpp | 2 +- scheduler/PostgresSchedDB/sql/Transaction.hpp | 1 - .../tapeserver/daemon/DataTransferSession.cpp | 2 +- .../daemon/MigrationTaskInjector.cpp | 8 +- 20 files changed, 182 insertions(+), 130 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 6ee41ce62e..d931b362f0 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -3,6 +3,7 @@ ## Features - cta/CTA#646 - JSON logging with correct field types - cta/CTA#350 - Change fxid to fid in command tools +- cta/CTA#641 - Archive workflow of Postgres Scheduler DB can write a file to tape ### Bug Fixes - cta/CTA#485 - Check disk file metadata on delete requests diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 33a17c4f09..919736d287 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -192,9 +192,9 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct if (!job) continue; cta::log::ScopedParamContainer params(logContext); params.add("tapeVid", job->tapeFile.vid) - .add("mountType", cta::common::dataStructures::toString(job->m_mount->getMountType())) - .add("fileId", job->archiveFile.archiveFileID) - .add("type", "ReportSuccessful"); + .add("mountType", cta::common::dataStructures::toString(job->m_mount->getMountType())) + .add("fileId", job->archiveFile.archiveFileID) + .add("type", "ReportSuccessful"); logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(): archive job successful"); try { tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release()); @@ -231,9 +231,9 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct { cta::log::ScopedParamContainer params(logContext); params.add("tapeFilesWritten", tapeItemsWritten.size()) - .add("files", files) - .add("bytes", bytes) - .add("catalogueTime", catalogueTime); + .add("files", files) + .add("bytes", bytes) + .add("catalogueTime", catalogueTime); logContext.log(cta::log::INFO, "Catalog updated for batch of jobs"); } @@ -243,20 +243,20 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct schedulerDbTime = t.secs(utils::Timer::resetCounter); cta::log::ScopedParamContainer params(logContext); params.add("files", files) - .add("bytes", bytes) - .add("catalogueTime", catalogueTime) - .add("schedulerDbTime", schedulerDbTime) - .add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime); + .add("bytes", bytes) + .add("catalogueTime", catalogueTime) + .add("schedulerDbTime", schedulerDbTime) + .add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime); logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchTransferred(): recorded a batch of archive jobs in metadata"); } catch (const cta::exception::NoSuchObject& ex){ cta::log::ScopedParamContainer params(logContext); params.add("exceptionMessageValue", ex.getMessageValue()); if (job) { params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", failedValidationJobReportURL); + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", failedValidationJobReportURL); } const std::string msg_error = "In ArchiveMount::reportJobsBatchTransferred(): job does not exist in the Scheduler DB"; logContext.log(cta::log::WARNING, msg_error); @@ -265,10 +265,10 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct params.add("exceptionMessageValue", e.getMessageValue()); if (job) { params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", failedValidationJobReportURL); + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", failedValidationJobReportURL); } const std::string msg_error = "In ArchiveMount::reportJobsBatchTransferred(): got an exception"; logContext.log(cta::log::ERR, msg_error); @@ -294,9 +294,9 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct params.add("exceptionWhat", e.what()); if (job) { params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); } const std::string msg_error = "In ArchiveMount::reportJobsBatchTransferred(): got a standard exception"; logContext.log(cta::log::ERR, msg_error); @@ -313,7 +313,6 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct } } - //------------------------------------------------------------------------------ // complete //------------------------------------------------------------------------------ diff --git a/scheduler/PostgresSchedDB/ArchiveJob.cpp b/scheduler/PostgresSchedDB/ArchiveJob.cpp index 087517206d..5fa739f712 100644 --- a/scheduler/PostgresSchedDB/ArchiveJob.cpp +++ b/scheduler/PostgresSchedDB/ArchiveJob.cpp @@ -21,6 +21,8 @@ namespace cta::postgresscheddb { ArchiveJob::ArchiveJob() = default; +ArchiveJob::ArchiveJob(bool jobOwned, uint64_t jobID, uint64_t mountID, std::string_view tapePool) : + m_jobOwned(jobOwned), m_mountId(mountID), m_jobId(jobID), m_tapePool(tapePool) { }; void ArchiveJob::failTransfer(const std::string & failureReason, log::LogContext & lc) { diff --git a/scheduler/PostgresSchedDB/ArchiveJob.hpp b/scheduler/PostgresSchedDB/ArchiveJob.hpp index 6958f6158e..bd7bed78f1 100644 --- a/scheduler/PostgresSchedDB/ArchiveJob.hpp +++ b/scheduler/PostgresSchedDB/ArchiveJob.hpp @@ -35,6 +35,7 @@ class ArchiveJob : public SchedulerDatabase::ArchiveJob { public: ArchiveJob(); + ArchiveJob(bool jobOwned, uint64_t jobID, uint64_t mountID, std::string_view tapePool); void failTransfer(const std::string & failureReason, log::LogContext & lc) override; @@ -44,7 +45,9 @@ class ArchiveJob : public SchedulerDatabase::ArchiveJob { bool m_jobOwned = false; uint64_t m_mountId = 0; + uint64_t m_jobId = 0; std::string m_tapePool; + }; } // namespace cta::postgresscheddb diff --git a/scheduler/PostgresSchedDB/ArchiveMount.cpp b/scheduler/PostgresSchedDB/ArchiveMount.cpp index b95717db21..828604aa88 100644 --- a/scheduler/PostgresSchedDB/ArchiveMount.cpp +++ b/scheduler/PostgresSchedDB/ArchiveMount.cpp @@ -18,63 +18,86 @@ #include "scheduler/PostgresSchedDB/ArchiveMount.hpp" #include "scheduler/PostgresSchedDB/ArchiveJob.hpp" #include "common/exception/Exception.hpp" +#include "common/exception/NoSuchObject.hpp" +#include "common/utils/utils.hpp" #include "scheduler/PostgresSchedDB/sql/ArchiveJobQueue.hpp" +#include "scheduler/PostgresSchedDB/sql/Transaction.hpp" +#include "catalogue/TapeDrivesCatalogueState.hpp" + +#include <unordered_map> namespace cta::postgresscheddb { const SchedulerDatabase::ArchiveMount::MountInfo &ArchiveMount::getMountInfo() { - throw cta::exception::Exception("Not implemented"); + return mountInfo; } std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> ArchiveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) { - + logContext.log(cta::log::DEBUG, "Entering ArchiveMount::getNextJobBatch()"); rdbms::Rset resultSet; - // make the txn connection commit and can any pending transactions - auto& nonTxnConn = m_txn.getNonTxnConn(); + // fetch a non transactional connection from the PGSCHED connection pool + auto conn = m_PostgresSchedDB.m_connPool.getConn(); // retrieve batch up to file limit if(m_queueType == common::dataStructures::JobQueueType::JobsToTransferForUser) { + logContext.log(cta::log::DEBUG, "Query JobsToTransferForUser ArchiveMount::getNextJobBatch()"); resultSet = cta::postgresscheddb::sql::ArchiveJobQueueRow::select( - nonTxnConn, ArchiveJobStatus::AJS_ToTransferForUser, mountInfo.tapePool, filesRequested); - + conn, ArchiveJobStatus::AJS_ToTransferForUser, mountInfo.tapePool, filesRequested); + logContext.log(cta::log::DEBUG, "After first selection of AJS_ToTransferForUser ArchiveMount::getNextJobBatch()"); } else { + logContext.log(cta::log::DEBUG, "Query JobsToTransferForRepack ArchiveMount::getNextJobBatch()"); resultSet = cta::postgresscheddb::sql::ArchiveJobQueueRow::select( - nonTxnConn, ArchiveJobStatus::AJS_ToTransferForRepack, mountInfo.tapePool, filesRequested); + conn, ArchiveJobStatus::AJS_ToTransferForRepack, mountInfo.tapePool, filesRequested); } - + logContext.log(cta::log::DEBUG, "Filling jobs in ArchiveMount::getNextJobBatch()"); std::list<sql::ArchiveJobQueueRow> jobs; + std::string jobIDsString; + std::list<std::string> jobIDsList; // filter retrieved batch up to size limit uint64_t totalBytes = 0; while(resultSet.next()) { + uint64_t assigned_jobid = resultSet.columnUint64("JOB_ID"); jobs.emplace_back(resultSet); + jobs.back().jobId = assigned_jobid; + jobIDsList.emplace_back(std::to_string(assigned_jobid)); + jobIDsString += std::to_string(assigned_jobid) + ","; + logContext.log(cta::log::DEBUG, "jobIDsString: " + jobIDsString); totalBytes += jobs.back().archiveFile.fileSize; if(totalBytes >= bytesRequested) break; } - - // mark the jobs in the batch as owned - m_txn.start(); - sql::ArchiveJobQueueRow::updateMountId(m_txn, jobs, mountInfo.mountId); - m_txn.commit(); + logContext.log(cta::log::DEBUG, "Ended filling jobs in ArchiveMount::getNextJobBatch() executing ArchiveJobQueueRow::updateMountID"); // Construct the return value std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> ret; - for (const auto &j : jobs) { - auto aj = std::make_unique<postgresscheddb::ArchiveJob>(/* j.jobId */); - aj->tapeFile.copyNb = j.copyNb; - aj->archiveFile = j.archiveFile; - aj->archiveReportURL = j.archiveReportUrl; - aj->errorReportURL = j.archiveErrorReportUrl; - aj->srcURL = j.srcUrl; - aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape; - aj->tapeFile.vid = mountInfo.vid; - aj->tapeFile.blockId = std::numeric_limits<decltype(aj->tapeFile.blockId)>::max(); -// m_jobOwned ? - aj->m_mountId = mountInfo.mountId; - aj->m_tapePool = mountInfo.tapePool; -// reportType ? - ret.emplace_back(std::move(aj)); + + if (!jobIDsString.empty()) { + jobIDsString.pop_back(); // Remove the trailing comma + // mark the jobs in the batch as owned + try { + cta::postgresscheddb::Transaction txn(m_PostgresSchedDB.m_connPool); + txn.lockGlobal(0); + sql::ArchiveJobQueueRow::updateMountID(txn, jobIDsList, mountInfo.mountId); + txn.commit(); + logContext.log(cta::log::DEBUG, "Finished to update Mount ID to: " + std::to_string(mountInfo.mountId)+ " for JOB IDs: " + jobIDsString); + for (const auto &j : jobs) { + auto aj = std::make_unique<postgresscheddb::ArchiveJob>(true, mountInfo.mountId, j.jobId, mountInfo.tapePool); + aj->tapeFile.copyNb = j.copyNb; + aj->archiveFile = j.archiveFile; + aj->archiveReportURL = j.archiveReportUrl; + aj->errorReportURL = j.archiveErrorReportUrl; + aj->srcURL = j.srcUrl; + aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape; + aj->tapeFile.vid = mountInfo.vid; + aj->tapeFile.blockId = std::numeric_limits<decltype(aj->tapeFile.blockId)>::max(); + // reportType ? + ret.emplace_back(std::move(aj)); + } + } catch (exception::Exception& ex) { + logContext.log(cta::log::DEBUG, "In sql::ArchiveJobQueueRow::updateMountID: failed to update Mount ID." + ex.getMessageValue()); + } + logContext.log(cta::log::DEBUG, "Finished updating Mount ID for the selected jobs ArchiveJobQueueRow::updateMountID" + jobIDsString); } return ret; } @@ -82,18 +105,51 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> ArchiveMount::getNextJ void ArchiveMount::setDriveStatus(common::dataStructures::DriveStatus status, common::dataStructures::MountType mountType, time_t completionTime, const std::optional<std::string>& reason) { - throw cta::exception::Exception("Not implemented"); + // We just report the drive status as instructed by the tape thread. + // Reset the drive state. + common::dataStructures::DriveInfo driveInfo; + driveInfo.driveName = mountInfo.drive; + driveInfo.logicalLibrary = mountInfo.logicalLibrary; + driveInfo.host = mountInfo.host; + ReportDriveStatusInputs inputs; + inputs.mountType = mountType; + inputs.mountSessionId = mountInfo.mountId; + inputs.reportTime = completionTime; + inputs.status = status; + inputs.vid = mountInfo.vid; + inputs.tapepool = mountInfo.tapePool; + inputs.vo = mountInfo.vo; + inputs.reason = reason; + // TODO: statistics! + inputs.byteTransferred = 0; + inputs.filesTransferred = 0; + log::LogContext lc(m_PostgresSchedDB.m_logger); + m_PostgresSchedDB.m_tapeDrivesState->updateDriveStatus(driveInfo, inputs, lc); } void ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) { - throw cta::exception::Exception("Not implemented"); + // We just report the tape session statistics as instructed by the tape thread. + // Reset the drive state. + common::dataStructures::DriveInfo driveInfo; + driveInfo.driveName = mountInfo.drive; + driveInfo.logicalLibrary = mountInfo.logicalLibrary; + driveInfo.host=mountInfo.host; + + ReportDriveStatsInputs inputs; + inputs.reportTime = time(nullptr); + inputs.bytesTransferred = stats.dataVolume; + inputs.filesTransferred = stats.filesCount; + + log::LogContext lc(m_PostgresSchedDB.m_logger); + m_PostgresSchedDB.m_tapeDrivesState->updateDriveStatistics(driveInfo, inputs, lc); } void ArchiveMount::setJobBatchTransferred( std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> & jobsBatch, log::LogContext & lc) { - throw cta::exception::Exception("Not implemented"); + lc.log(log::WARNING, + "In postgresscheddb::ArchiveMount::setJobBatchTransferred(): set ArchiveRequests passes as dummy implementation !"); } } // namespace cta::postgresscheddb diff --git a/scheduler/PostgresSchedDB/ArchiveMount.hpp b/scheduler/PostgresSchedDB/ArchiveMount.hpp index ce8e5e5393..0c135435be 100644 --- a/scheduler/PostgresSchedDB/ArchiveMount.hpp +++ b/scheduler/PostgresSchedDB/ArchiveMount.hpp @@ -17,12 +17,11 @@ #pragma once -#include "scheduler/PostgresSchedDB/PostgresSchedDB.hpp" #include "common/log/LogContext.hpp" #include "common/dataStructures/DriveState.hpp" #include "common/dataStructures/MountType.hpp" -#include "scheduler/PostgresSchedDB/sql/Transaction.hpp" #include "scheduler/PostgresSchedDB/sql/Enums.hpp" +#include "scheduler/PostgresSchedDB/PostgresSchedDB.hpp" #include <list> #include <memory> @@ -30,15 +29,18 @@ #include <cstdint> #include <time.h> + namespace cta::postgresscheddb { +class TapeMountDecisionInfo; + class ArchiveMount : public SchedulerDatabase::ArchiveMount { friend class cta::PostgresSchedDB; - + friend class TapeMountDecisionInfo; public: - ArchiveMount(const std::string& ownerId, Transaction& txn, common::dataStructures::JobQueueType queueType) : - m_ownerId(ownerId), m_txn(txn), m_queueType(queueType) { } + ArchiveMount(PostgresSchedDB &pdb, const std::string& ownerId, common::dataStructures::JobQueueType queueType) : + m_PostgresSchedDB(pdb), m_ownerId(ownerId), m_queueType(queueType) { } const MountInfo & getMountInfo() override; @@ -46,7 +48,7 @@ class ArchiveMount : public SchedulerDatabase::ArchiveMount { uint64_t bytesRequested, log::LogContext& logContext) override; void setDriveStatus(common::dataStructures::DriveStatus status, common::dataStructures::MountType mountType, - time_t completionTime, const std::optional<std::string>& reason = std::nullopt) override; + time_t completionTime, const std::optional<std::string>& reason = std::nullopt) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; @@ -55,8 +57,8 @@ class ArchiveMount : public SchedulerDatabase::ArchiveMount { private: + cta::PostgresSchedDB& m_PostgresSchedDB; const std::string& m_ownerId; - Transaction& m_txn; common::dataStructures::JobQueueType m_queueType; }; diff --git a/scheduler/PostgresSchedDB/PostgresSchedDB.cpp b/scheduler/PostgresSchedDB/PostgresSchedDB.cpp index 5e88756a36..acc7f10cc9 100644 --- a/scheduler/PostgresSchedDB/PostgresSchedDB.cpp +++ b/scheduler/PostgresSchedDB/PostgresSchedDB.cpp @@ -27,7 +27,6 @@ #include "scheduler/PostgresSchedDB/ArchiveRequest.hpp" #include "scheduler/PostgresSchedDB/TapeMountDecisionInfo.hpp" #include "scheduler/PostgresSchedDB/Helpers.hpp" -#include "scheduler/PostgresSchedDB/RetrieveJob.hpp" #include "scheduler/PostgresSchedDB/RetrieveRequest.hpp" #include "scheduler/PostgresSchedDB/RepackRequest.hpp" @@ -180,14 +179,12 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > PostgresSchedDB::getN // Construct the return value std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob>> ret; for (const auto &j : jobs) { - auto aj = std::make_unique<postgresscheddb::ArchiveJob>(/* j.jobId */); + auto aj = std::make_unique<postgresscheddb::ArchiveJob>(true, j.mountId.value(), j.jobId, j.tapePool); aj->tapeFile.copyNb = j.copyNb; aj->archiveFile = j.archiveFile; aj->archiveReportURL = j.archiveReportUrl; aj->errorReportURL = j.archiveErrorReportUrl; aj->srcURL = j.srcUrl; - aj->m_mountId = j.mountId; - aj->m_tapePool = j.tapePool; ret.emplace_back(std::move(aj)); } logContext.log(log::DEBUG, "In PostgresSchedDB::getNextArchiveJobsToReportBatch(): After Archive Jobs filled, before return."); @@ -513,7 +510,7 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> PostgresSchedDB::getMo utils::Timer t; // Allocate the getMountInfostructure to return. - auto privateRet = std::make_unique<postgresscheddb::TapeMountDecisionInfo>(*this, m_connPool, m_ownerId, m_tapeDrivesState.get(), m_logger); + auto privateRet = std::make_unique<postgresscheddb::TapeMountDecisionInfo>(*this, m_ownerId, m_tapeDrivesState.get(), m_logger); TapeMountDecisionInfo& tmdi = *privateRet; // Take an exclusive lock on the scheduling @@ -544,7 +541,7 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> PostgresSchedDB::getMo utils::Timer t; // Allocate the getMountInfostructure to return - auto privateRet = std::make_unique<postgresscheddb::TapeMountDecisionInfo>(*this, m_connPool, m_ownerId, m_tapeDrivesState.get(), m_logger); + auto privateRet = std::make_unique<postgresscheddb::TapeMountDecisionInfo>(*this, m_ownerId, m_tapeDrivesState.get(), m_logger); TapeMountDecisionInfo& tmdi = *privateRet; diff --git a/scheduler/PostgresSchedDB/PostgresSchedDB.hpp b/scheduler/PostgresSchedDB/PostgresSchedDB.hpp index 5e0fe11481..3bb33d4646 100644 --- a/scheduler/PostgresSchedDB/PostgresSchedDB.hpp +++ b/scheduler/PostgresSchedDB/PostgresSchedDB.hpp @@ -25,6 +25,7 @@ #include <tuple> #include <vector> +#include "catalogue/TapeDrivesCatalogueState.hpp" #include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp" #include "common/dataStructures/ArchiveJob.hpp" #include "common/dataStructures/ArchiveRequest.hpp" @@ -38,18 +39,22 @@ #include "common/dataStructures/RetrieveRequest.hpp" #include "common/dataStructures/SecurityIdentity.hpp" #include "common/log/Logger.hpp" +#include "common/utils/utils.hpp" #include "rdbms/ConnPool.hpp" #include "rdbms/Login.hpp" #include "scheduler/RetrieveJob.hpp" #include "scheduler/SchedulerDatabase.hpp" -#include "common/utils/utils.hpp" -#include "catalogue/TapeDrivesCatalogueState.hpp" + namespace cta { -namespace catalogue { -class Catalogue; -} + namespace catalogue { + class Catalogue; + } + namespace postgresscheddb { + class ArchiveMount; + class TapeMountDecisionInfo; + } class PostgresSchedDB: public SchedulerDatabase { public: @@ -60,7 +65,8 @@ class PostgresSchedDB: public SchedulerDatabase { const uint64_t nbConns); ~PostgresSchedDB() override; - + friend class cta::postgresscheddb::ArchiveMount; + friend class cta::postgresscheddb::TapeMountDecisionInfo; void waitSubthreadsComplete() override; /*============ Basic IO check: validate Postgres DB store access ===============*/ @@ -221,7 +227,6 @@ private: } ~RepackRequestPromotionStatisticsNoLock() override = default; }; - }; } // namespace cta diff --git a/scheduler/PostgresSchedDB/RetrieveMount.cpp b/scheduler/PostgresSchedDB/RetrieveMount.cpp index 9eb47dad34..4df61e797d 100644 --- a/scheduler/PostgresSchedDB/RetrieveMount.cpp +++ b/scheduler/PostgresSchedDB/RetrieveMount.cpp @@ -47,7 +47,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> RetrieveMount::getNex } // mark the jobs in the batch as owned - sql::RetrieveJobQueueRow::updateMountId(m_txn, jobs, mountInfo.mountId); + sql::RetrieveJobQueueRow::updateMountID(m_txn, jobs, mountInfo.mountId); m_txn.commit(); // Construct the return value diff --git a/scheduler/PostgresSchedDB/TapeMountDecisionInfo.cpp b/scheduler/PostgresSchedDB/TapeMountDecisionInfo.cpp index a359c9e507..4605e4a237 100644 --- a/scheduler/PostgresSchedDB/TapeMountDecisionInfo.cpp +++ b/scheduler/PostgresSchedDB/TapeMountDecisionInfo.cpp @@ -24,9 +24,9 @@ namespace cta::postgresscheddb { -TapeMountDecisionInfo::TapeMountDecisionInfo(PostgresSchedDB &pdb, rdbms::ConnPool &cp, const std::string &ownerId, TapeDrivesCatalogueState *drivesState, log::Logger &logger) : +TapeMountDecisionInfo::TapeMountDecisionInfo(PostgresSchedDB &pdb, const std::string &ownerId, TapeDrivesCatalogueState *drivesState, log::Logger &logger) : m_PostgresSchedDB(pdb), - m_txn(cp), + m_txn(pdb.m_connPool), m_ownerId(ownerId), m_logger(logger), m_tapeDrivesState(drivesState) @@ -60,7 +60,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> TapeMountDecisionInfo::createAr "createArchiveMount(): unexpected mount type."); } - auto privateRet = std::make_unique<postgresscheddb::ArchiveMount>(m_ownerId, m_txn, queueType); + auto privateRet = std::make_unique<postgresscheddb::ArchiveMount>(m_PostgresSchedDB, m_ownerId, queueType); auto &am = *privateRet; // Check we hold the scheduling lock @@ -71,7 +71,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> TapeMountDecisionInfo::createAr // Get the next Mount Id auto newMountId = cta::postgresscheddb::sql::MountsRow::getNextMountID(m_txn); - + commit(); am.nbFilesCurrentlyOnTape = tape.lastFSeq; // Fill up the mount info am.mountInfo.mountType = mount.type; @@ -110,6 +110,7 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> TapeMountDecisionInfo::createR // Get the next Mount Id auto newMountId = cta::postgresscheddb::sql::MountsRow::getNextMountID(m_txn); + commit(); // Fill up the mount info rm.mountInfo.vid = mount.vid; diff --git a/scheduler/PostgresSchedDB/TapeMountDecisionInfo.hpp b/scheduler/PostgresSchedDB/TapeMountDecisionInfo.hpp index 3b454c96c6..266252ee4e 100644 --- a/scheduler/PostgresSchedDB/TapeMountDecisionInfo.hpp +++ b/scheduler/PostgresSchedDB/TapeMountDecisionInfo.hpp @@ -20,7 +20,6 @@ #include "scheduler/PostgresSchedDB/PostgresSchedDB.hpp" #include "common/dataStructures/LabelFormat.hpp" #include "common/dataStructures/MountType.hpp" -#include "scheduler/PostgresSchedDB/sql/Transaction.hpp" #include "catalogue/TapeDrivesCatalogueState.hpp" #include <memory> @@ -29,13 +28,19 @@ #include <cstdint> #include <time.h> +#ifndef TAPEMOUNTDECISIONINFO_H +#define TAPEMOUNTDECISIONINFO_H +#endif /* TAPEMOUNTDECISIONINFO_H */ + + namespace cta::postgresscheddb { +class ArchiveMount; + class TapeMountDecisionInfo : public SchedulerDatabase::TapeMountDecisionInfo { friend class cta::PostgresSchedDB; - public: - explicit TapeMountDecisionInfo(PostgresSchedDB &pdb, rdbms::ConnPool &cp, const std::string &ownerId, TapeDrivesCatalogueState *drivesState, log::Logger &logger); + explicit TapeMountDecisionInfo(PostgresSchedDB &pdb, const std::string &ownerId, TapeDrivesCatalogueState *drivesState, log::Logger &logger); std::unique_ptr<SchedulerDatabase::ArchiveMount> createArchiveMount(const cta::SchedulerDatabase::PotentialMount& mount, const catalogue::TapeForWriting& tape, @@ -54,8 +59,8 @@ class TapeMountDecisionInfo : public SchedulerDatabase::TapeMountDecisionInfo { /** Commit decision and release scheduler global lock */ void commit(); - PostgresSchedDB& m_PostgresSchedDB; - Transaction m_txn; + cta::PostgresSchedDB& m_PostgresSchedDB; + postgresscheddb::Transaction m_txn; std::string m_ownerId; bool m_lockTaken = false; log::Logger& m_logger; diff --git a/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.cpp b/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.cpp index 9ce15b75b0..508a451b8e 100644 --- a/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.cpp +++ b/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.cpp @@ -21,42 +21,17 @@ namespace cta::postgresscheddb::sql { -void ArchiveJobQueueRow::updateMountId(Transaction &txn, const std::list<ArchiveJobQueueRow>& rowList, uint64_t mountId) { - if(rowList.empty()) return; - - try { - const char *const sqltt = "CREATE TEMPORARY TABLE TEMP_JOB_IDS (JOB_ID BIGINT) ON COMMIT DROP"; - txn.conn().executeNonQuery(sqltt); - } catch(exception::Exception &ex) { - const char *const sqltrunc = "TRUNCATE TABLE TEMP_JOB_IDS"; - txn.conn().executeNonQuery(sqltrunc); - } - - const char *const sqlcopy = - "COPY TEMP_JOB_IDS(JOB_ID) FROM STDIN --" - ":JOB_ID"; - - auto stmt = txn.conn().createStmt(sqlcopy); - auto &postgresStmt = dynamic_cast<rdbms::wrapper::PostgresStmt &>(stmt.getStmt()); - - const size_t nbrows = rowList.size(); - cta::rdbms::wrapper::PostgresColumn c1("JOB_ID", nbrows); - std::list<ArchiveJobQueueRow>::const_iterator itr; - size_t i; - for(i=0,itr=rowList.begin();i<nbrows;++i,++itr) { - c1.setFieldValue(i, std::to_string(itr->jobId)); - } - - postgresStmt.setColumn(c1); - postgresStmt.executeCopyInsert(nbrows); - - const char *const sql = +void ArchiveJobQueueRow::updateMountID(Transaction &txn, const std::list<std::string>& jobIDs, uint64_t mountId) { + if(jobIDs.empty()) return; + std::string sqlpart; + for (const auto &piece : jobIDs) sqlpart += piece; + txn.start(); + std::string sql = "UPDATE ARCHIVE_JOB_QUEUE SET " "MOUNT_ID = :MOUNT_ID " "WHERE " - " JOB_ID IN (SELECT JOB_ID FROM TEMP_JOB_IDS)"; - - stmt = txn.conn().createStmt(sql); + " JOB_ID IN (" + sqlpart + ")"; + auto stmt = txn.conn().createStmt(sql); stmt.bindUint64(":MOUNT_ID", mountId); stmt.executeQuery(); } diff --git a/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.hpp b/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.hpp index 045a6c7e03..af4c2263bc 100644 --- a/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.hpp +++ b/scheduler/PostgresSchedDB/sql/ArchiveJobQueue.hpp @@ -31,7 +31,7 @@ namespace cta::postgresscheddb::sql { struct ArchiveJobQueueRow { uint64_t jobId = 0; - uint64_t mountId = 0; + std::optional<std::uint64_t> mountId = std::nullopt; ArchiveJobStatus status = ArchiveJobStatus::AJS_ToTransferForUser; std::string tapePool; std::string mountPolicy; @@ -84,7 +84,7 @@ struct ArchiveJobQueueRow { ArchiveJobQueueRow& operator=(const rdbms::Rset &rset) { jobId = rset.columnUint64("JOB_ID"); - mountId = rset.columnOptionalUint64("MOUNT_ID").value_or(0); + mountId = rset.columnOptionalUint64("MOUNT_ID"); status = from_string<ArchiveJobStatus>( rset.columnString("STATUS") ); tapePool = rset.columnString("TAPE_POOL"); @@ -117,6 +117,7 @@ struct ArchiveJobQueueRow { } void insert(Transaction &txn) const { + // does not set mountId or jobId const char *const sql = "INSERT INTO ARCHIVE_JOB_QUEUE(" @@ -202,11 +203,12 @@ struct ArchiveJobQueueRow { stmt.bindUint16(":MAX_TOTAL_RETRIES", maxTotalRetries); stmt.executeNonQuery(); + } void addParamsToLogContext(log::ScopedParamContainer& params) const { // does not set jobId - params.add("mountId", mountId); + params.add("mountId", mountId.has_value() ? std::to_string(mountId.value()) : "no value"); params.add("status", to_string(status)); params.add("tapePool", tapePool); params.add("mountPolicy", mountPolicy); @@ -409,10 +411,10 @@ struct ArchiveJobQueueRow { * Assign a mount ID to the specified rows * * @param txn Transaction to use for this query - * @param rowList List of table rows to claim for the new owner + * @param jobIDs String consisting of comma separated job IDs to update with the given Mount ID * @param mountId Mount ID to assign */ - static void updateMountId(Transaction &txn, const std::list<ArchiveJobQueueRow>& rowList, uint64_t mountId); + static void updateMountID(Transaction &txn, const std::list<std::string>& jobIDs, uint64_t mountId); }; } // namespace cta::postgresscheddb::sql diff --git a/scheduler/PostgresSchedDB/sql/ArchiveJobSummary.hpp b/scheduler/PostgresSchedDB/sql/ArchiveJobSummary.hpp index 82298deed1..a05695e533 100644 --- a/scheduler/PostgresSchedDB/sql/ArchiveJobSummary.hpp +++ b/scheduler/PostgresSchedDB/sql/ArchiveJobSummary.hpp @@ -23,7 +23,7 @@ namespace cta::postgresscheddb::sql { struct ArchiveJobSummaryRow { - uint64_t mountId = 0; + std::optional<std::uint64_t> mountId = std::nullopt; ArchiveJobStatus status = ArchiveJobStatus::AJS_ToTransferForUser; std::string tapePool; std::string mountPolicy; @@ -45,7 +45,7 @@ struct ArchiveJobSummaryRow { } ArchiveJobSummaryRow& operator=(const rdbms::Rset &rset) { - mountId = rset.columnOptionalUint64("MOUNT_ID").value_or(0); + mountId = rset.columnOptionalUint64("MOUNT_ID"); status = from_string<ArchiveJobStatus>( rset.columnString("STATUS") ); tapePool = rset.columnString("TAPE_POOL"); @@ -59,7 +59,7 @@ struct ArchiveJobSummaryRow { } void addParamsToLogContext(log::ScopedParamContainer& params) const { - params.add("mountId", mountId); + params.add("mountId", mountId.has_value() ? std::to_string(mountId.value()) : "no value"); params.add("status", to_string(status)); params.add("tapePool", tapePool); params.add("mountPolicy", mountPolicy); diff --git a/scheduler/PostgresSchedDB/sql/Mounts.hpp b/scheduler/PostgresSchedDB/sql/Mounts.hpp index 6389b642e6..e37a3f532e 100644 --- a/scheduler/PostgresSchedDB/sql/Mounts.hpp +++ b/scheduler/PostgresSchedDB/sql/Mounts.hpp @@ -59,6 +59,7 @@ struct MountsRow { static uint64_t getNextMountID(Transaction &txn) { try { const char *const sql = "select NEXTVAL('MOUNT_ID_SEQ') AS MOUNT_ID"; + txn.start(); auto stmt = txn.conn().createStmt(sql); auto rset = stmt.executeQuery(); if (!rset.next()) { diff --git a/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.cpp b/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.cpp index 0a72fac802..4578e99d1b 100644 --- a/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.cpp +++ b/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.cpp @@ -21,7 +21,7 @@ namespace cta::postgresscheddb::sql { -void RetrieveJobQueueRow::updateMountId(Transaction &txn, const std::list<RetrieveJobQueueRow>& rowList, uint64_t mountId) { +void RetrieveJobQueueRow::updateMountID(Transaction &txn, const std::list<RetrieveJobQueueRow>& rowList, uint64_t mountId) { if(rowList.empty()) return; try { diff --git a/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.hpp b/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.hpp index 1dbb80f420..1afc27078e 100644 --- a/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.hpp +++ b/scheduler/PostgresSchedDB/sql/RetrieveJobQueue.hpp @@ -433,7 +433,7 @@ struct RetrieveJobQueueRow { * @param rowList List of table rows to claim for the new owner * @param mountId Mount ID to assign */ - static void updateMountId(Transaction &txn, const std::list<RetrieveJobQueueRow>& rowList, uint64_t mountId); + static void updateMountID(Transaction &txn, const std::list<RetrieveJobQueueRow>& rowList, uint64_t mountId); }; } // namespace cta::postgresscheddb::sql diff --git a/scheduler/PostgresSchedDB/sql/Transaction.hpp b/scheduler/PostgresSchedDB/sql/Transaction.hpp index d6bd50e3b0..fd4d683efb 100644 --- a/scheduler/PostgresSchedDB/sql/Transaction.hpp +++ b/scheduler/PostgresSchedDB/sql/Transaction.hpp @@ -85,7 +85,6 @@ public: private: - rdbms::Conn m_conn_non_txn; rdbms::Conn m_conn; bool m_begin = true; }; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp index c523e27593..9b8557eb29 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSession.cpp @@ -451,9 +451,9 @@ castor::tape::tapeserver::daemon::DataTransferSession::executeWrite(cta::log::Lo writeSingleThread.setTaskInjector(&taskInjector); reportPacker.setWatchdog(watchDog); cta::utils::Timer timer; - bool noFilesToMigrate = false; if (taskInjector.synchronousInjection(noFilesToMigrate)) { + logContext.log(cta::log::DEBUG, "After if (taskInjector.synchronousInjection())"); const uint64_t firstFseqFromClient = taskInjector.firstFseqToWrite(); // The last fseq written on the tape is the first file's fseq minus one diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index fb6d20b0ec..0ce7aafc95 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -49,7 +49,7 @@ void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta:: LogContext::ScopedParam(m_lc, Param("fSeq", job->tapeFile.fSeq)), LogContext::ScopedParam(m_lc, Param("path", job->srcURL))}; tape::utils::suppresUnusedVariable(sp); - + m_lc.log(cta::log::DEBUG, "MigrationTaskInjector::injectBulkMigrations file size: " + std::to_string(fileSize)); const uint64_t neededBlock = howManyBlocksNeeded(fileSize, blockCapacity); // We give owner ship on the archive job to the tape write task (as last user). @@ -108,7 +108,9 @@ bool MigrationTaskInjector::synchronousInjection(bool& noFilesToMigrate) { try { //First popping of files, we multiply the number of popped files / bytes by 2 to avoid multiple mounts on Repack //(it is applied to ArchiveForUser and ArchiveForRepack batches) + m_lc.log(cta::log::DEBUG, "Before m_archiveMount.getNextJobBatch()"); jobs = m_archiveMount.getNextJobBatch(2 * m_maxFiles, 2 * m_maxBytes, m_lc); + m_lc.log(cta::log::DEBUG, "After m_archiveMount.getNextJobBatch()"); } catch (cta::exception::Exception& ex) { cta::log::ScopedParamContainer scoped(m_lc); scoped.add("transactionId", m_archiveMount.getMountTransactionId()) @@ -162,11 +164,13 @@ void MigrationTaskInjector::WorkerThread::run() { throw castor::tape::tapeserver::daemon::ErrorFlag(); } Request req = m_parent.m_queue.pop(); + m_parent.m_lc.log(cta::log::DEBUG, "MigrationTaskInjector::WorkerThread::run(): Trying to get jobs from archive Mount"); auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.bytesRequested, m_parent.m_lc); uint64_t files = jobs.size(); uint64_t bytes = 0; for (auto& j : jobs) bytes += j->archiveFile.fileSize; if (jobs.empty()) { + m_parent.m_lc.log(cta::log::DEBUG, "MigrationTaskInjector::WorkerThread::run(): No jobs were found"); if (req.lastCall) { m_parent.m_lc.log(cta::log::INFO, "No more file to migrate: triggering the end of session."); m_parent.signalEndDataMovement(); @@ -178,7 +182,7 @@ void MigrationTaskInjector::WorkerThread::run() { } } else { - + m_parent.m_lc.log(cta::log::DEBUG, "MigrationTaskInjector::WorkerThread::run(): injectBulkMigrations"); // Inject the tasks m_parent.injectBulkMigrations(jobs); // Decide on continuation -- GitLab