diff --git a/src/db/generic/GenericDbIfce.h b/src/db/generic/GenericDbIfce.h index 6eb9ac7e01e9a0c50d2ec6b5bad4930d445530fd..0df3eff4f1147424442f3eecb1fd407f6c8de178 100644 --- a/src/db/generic/GenericDbIfce.h +++ b/src/db/generic/GenericDbIfce.h @@ -113,13 +113,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy + /// @param fileMetadata The new file metadata in case it needs to be updated /// @return (true, newState) if an updated was done into the DB, (false, oldState) otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry) = 0; + int processId, double filesize, double duration, bool retry, std::string fileMetadata = "") = 0; /// Update the status of a job /// @param jobId The job ID diff --git a/src/db/generic/TransferFile.h b/src/db/generic/TransferFile.h index c5e3312f682a6b616bdc4669c48b02ee3f50d753..a71c391b6a3c578c15755793fbac6bb1c5001687 100644 --- a/src/db/generic/TransferFile.h +++ b/src/db/generic/TransferFile.h @@ -111,6 +111,8 @@ public: time_t jobFinished; std::string voName; std::string overwriteFlag; + std::string dstFileReport; + int archiveTimeout; std::string userDn; std::string credId; std::string checksumMode; diff --git a/src/db/mysql/MySqlAPI.cpp b/src/db/mysql/MySqlAPI.cpp index a4e119f997b037606bbb0cb0b10de30edc0c8897..aa97e0560ca28c5cf17a7477f28a28f8cdb8e0dc 100644 --- a/src/db/mysql/MySqlAPI.cpp +++ b/src/db/mysql/MySqlAPI.cpp @@ -659,8 +659,8 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, { soci::rowset rs = (sql.prepare << " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id, " - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " " f.source_se, f.dest_se, f.selection_strategy, j.internal_job_params, j.job_type " @@ -723,8 +723,8 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, if (it_act->second == 0) continue; std::string select = " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id," - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " " f.source_se, f.dest_se, f.selection_strategy, j.internal_job_params, j.job_type " @@ -1137,8 +1137,8 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, sql.prepare << " SELECT SQL_NO_CACHE " " f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id, " - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, " " f.bringonline_token, f.source_se, f.dest_se, f.selection_strategy, " @@ -1179,18 +1179,19 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, boost::tuple MySqlAPI::updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry) + int processId, double filesize, double duration, bool retry, std::string fileMetadata) { soci::session sql(*connectionPool); return updateFileTransferStatusInternal(sql, throughput, jobId, fileId, - transferState, errorReason, processId, filesize, duration, retry); + transferState, errorReason, processId, filesize, duration, retry, fileMetadata); } boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci::session& sql, double throughput, std::string jobId, uint64_t fileId, std::string newFileState, std::string transferMessage, - int processId, double filesize, double duration, bool retry) + int processId, double filesize, double duration, bool retry, + std::string fileMetadata) { std::string storedState; soci::indicator destSurlUuidInd; @@ -1309,6 +1310,11 @@ boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci newFileState = "ARCHIVING"; } + if (!fileMetadata.empty()) { + query << ", file_metadata = :file_metadata"; + stmt.exchange(soci::use(fileMetadata, "file_metadata")); + } + query << " , pid = :pid, filesize = :filesize, tx_duration = :duration, throughput = :throughput, current_failures = :current_failures " "WHERE file_id = :fileId AND file_state = :oldState"; stmt.exchange(soci::use(processId, "pid")); diff --git a/src/db/mysql/MySqlAPI.h b/src/db/mysql/MySqlAPI.h index 567889900dd72e58665714998a89b998a3c6be5b..0dfcf31bf67cb8e2f7150ba693d15e8a5d6d93f6 100644 --- a/src/db/mysql/MySqlAPI.h +++ b/src/db/mysql/MySqlAPI.h @@ -73,13 +73,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy + /// @param fileMetadata The new file metadata in case it needs to be updated /// @return true if an updated was done into the DB, false otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry); + int processId, double filesize, double duration, bool retry, std::string fileMetadata = ""); /// Update the status of a job /// @param jobId The job ID @@ -344,7 +345,8 @@ private: boost::tuple updateFileTransferStatusInternal(soci::session& sql, double throughput, std::string jobId, uint64_t fileId, - std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry); + std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry, + std::string fileMetadata = ""); bool updateJobTransferStatusInternal(soci::session& sql, std::string jobId, const std::string state); diff --git a/src/db/mysql/sociConversions.h b/src/db/mysql/sociConversions.h index b247513587158985e8c7bcff09f902c41abc5e9e..98a8b6ea011d6d99fcfc5d7badbcd659401f85f1 100644 --- a/src/db/mysql/sociConversions.h +++ b/src/db/mysql/sociConversions.h @@ -144,6 +144,8 @@ struct type_conversion file.voName = v.get("vo_name"); file.fileId = v.get("file_id"); file.overwriteFlag = v.get("overwrite_flag",""); + file.dstFileReport = v.get("dst_file_report",""); + file.archiveTimeout = v.get("archive_timeout",-1); file.userDn = v.get("user_dn"); file.credId = v.get("cred_id"); file.checksum = v.get("checksum",""); diff --git a/src/msg-bus/events/Message.proto b/src/msg-bus/events/Message.proto index 32cc2b3b0218a159ff43fe389605ea988d5f0df5..0bc34d32833280f5c63865c057813cb0e18f3bff 100644 --- a/src/msg-bus/events/Message.proto +++ b/src/msg-bus/events/Message.proto @@ -21,4 +21,6 @@ message Message { optional double throughput = 15; optional int32 errcode = 16; + + optional string file_metadata = 17; } diff --git a/src/server/services/transfers/MessageProcessingService.cpp b/src/server/services/transfers/MessageProcessingService.cpp index 91b35cac2d44f713741f67b0ed64d79a6f8616c1..af591642e468da9fd3a0d6d7914d818eddb527b2 100644 --- a/src/server/services/transfers/MessageProcessingService.cpp +++ b/src/server/services/transfers/MessageProcessingService.cpp @@ -261,8 +261,8 @@ void MessageProcessingService::performOtherMessageDbChange(const fts3::events::M boost::tuple updated = db::DBSingleton::instance() .getDBObjectInstance()->updateTransferStatus( msg.job_id(), msg.file_id(), msg.throughput(), msg.transfer_status(), - msg.transfer_message(), msg.process_id(), msg.filesize(), msg.time_in_secs(), msg.retry() - ); + msg.transfer_message(), msg.process_id(), msg.filesize(), msg.time_in_secs(), msg.retry(), + msg.file_metadata()); db::DBSingleton::instance().getDBObjectInstance()->updateJobStatus( msg.job_id(), msg.transfer_status()); diff --git a/src/server/services/transfers/UrlCopyCmd.cpp b/src/server/services/transfers/UrlCopyCmd.cpp index 7f09bd01bb7f65179b392a5cf3e1e0c3f4b9421d..5927682a9ae8ef80f8519b3426bea4e2ae74cf06 100644 --- a/src/server/services/transfers/UrlCopyCmd.cpp +++ b/src/server/services/transfers/UrlCopyCmd.cpp @@ -40,8 +40,7 @@ UrlCopyCmd::UrlCopyCmd() : IPv6Explicit(false) std::string UrlCopyCmd::prepareMetadataString(const std::string &text) { - std::string copy(text); - copy = boost::replace_all_copy(copy, " ", "?"); + std::string copy = boost::replace_all_copy(text, " ", "?"); copy = boost::replace_all_copy(copy, "\"", "\\\""); return copy; } @@ -203,6 +202,9 @@ void UrlCopyCmd::setFromTransfer(const TransferFile &transfer, setOption("checksum-mode", transfer.checksumMode); setOption("job-id", transfer.jobId); setFlag("overwrite", !transfer.overwriteFlag.empty()); + if (transfer.archiveTimeout > 0) { + setFlag("dst-file-report", !transfer.dstFileReport.empty()); + } setOption("dest-token-desc", transfer.destinationSpaceToken); setOption("source-token-desc", transfer.sourceSpaceToken); diff --git a/src/url-copy/DestFile.h b/src/url-copy/DestFile.h new file mode 100644 index 0000000000000000000000000000000000000000..92abf9410f4f2382bc6d34bb5bb8926070c8c499 --- /dev/null +++ b/src/url-copy/DestFile.h @@ -0,0 +1,53 @@ +#include +#include +#include +#include "heuristics.h" + + +class DestFile { + +public: + uint64_t fileSize; + std::string checksumType; + std::string checksumValue; + bool fileOnDisk; + bool fileOnTape; + + DestFile() : fileSize(0), fileOnDisk(false), fileOnTape(false) {} + + json::Object toJSON() + { + json::Object output; + + output["file_size"] = json::Number(fileSize); + output["checksum_type"] = json::String(checksumType); + output["checksum_value"] = json::String(checksumValue); + output["file_on_tape"] = json::Boolean(fileOnDisk); + output["file_on_disk"] = json::Boolean(fileOnTape); + + return output; + } + + static std::string appendDestFileToFileMetadata(std::string file_metadata, json::Object dst_file) + { + json::UnknownElement metadata; + + if (!file_metadata.empty()) { + try { + std::string new_file_metadata = replaceMetadataString(file_metadata); + std::istringstream valueStream(new_file_metadata); + json::Reader::Read(metadata, valueStream); + } + catch (...) { + metadata["file_metada"] = json::String(file_metadata); + } + } + + metadata["dst_file"] = json::Object(dst_file); + + std::ostringstream stream; + json::Writer::Write(metadata, stream); + + return stream.str(); + } +}; \ No newline at end of file diff --git a/src/url-copy/Gfal2.h b/src/url-copy/Gfal2.h index a96276383a5216e84e4dce204fab5fba9114015e..7659f92d113bd486940a4248af48c5bf6271e6be 100644 --- a/src/url-copy/Gfal2.h +++ b/src/url-copy/Gfal2.h @@ -376,6 +376,26 @@ public: throw Gfal2Exception(error); } } + + /// Get the checksum of a file + std::string getChecksum(const std::string &url, const std::string &type) { + char buffer[512]; + GError *error = NULL; + if (gfal2_checksum(context, url.c_str(), type.c_str(), 0, 0, buffer, sizeof(buffer), &error)) { + throw Gfal2Exception(error); + } + return buffer; + } + + // Get the extended attribute of a resource + std::string getXattr(const std::string &url, const std::string &name) { + char buffer[1024]; + GError *error = NULL; + if (gfal2_getxattr(context, url.c_str(), name.c_str(), buffer, sizeof(buffer), &error) < 0) { + throw Gfal2Exception(error); + } + return buffer; + } }; diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index ccf54c27bc88e9c502f5fe5e20e42c058cf9848a..37462ac301ffb08c77aca7085268366020756bc1 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -15,35 +15,14 @@ */ #include "LegacyReporter.h" -#include #include "common/Logger.h" #include "monitoring/msg-ifce.h" +#include "heuristics.h" namespace events = fts3::events; using fts3::common::commit; -static std::string mapErrnoToString(int err) -{ - char buf[256] = {0}; - char const *str = strerror_r(err, buf, sizeof(buf)); - if (str) { - std::string rep(str); - std::replace(rep.begin(), rep.end(), ' ', '_'); - return boost::to_upper_copy(rep); - } - return "GENERAL ERROR"; -} - - -static std::string replaceMetadataString(std::string text) -{ - text = boost::replace_all_copy(text, "?"," "); - text = boost::replace_all_copy(text, "\\\"","\""); - return text; -} - - LegacyReporter::LegacyReporter(const UrlCopyOpts &opts): producer(opts.msgDir), opts(opts), zmqContext(1), zmqPingSocket(zmqContext, ZMQ_PUB) { @@ -169,6 +148,9 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } else { status.set_transfer_status("FAILED"); + if ((transfer.error->code() == EEXIST) && (opts.dst_file_report) && (!opts.overwrite)) { + status.set_file_metadata(replaceMetadataString(transfer.fileMetadata)); + } } status.set_transfer_message(fullErrMsg.str()); status.set_retry(transfer.error->isRecoverable()); diff --git a/src/url-copy/UrlCopyError.h b/src/url-copy/UrlCopyError.h index 8e794a43d88276dd07c55c20c8a25ef9ccf5e669..a6e408762608f123bc3d961aeb87d7b4451fde1f 100644 --- a/src/url-copy/UrlCopyError.h +++ b/src/url-copy/UrlCopyError.h @@ -39,6 +39,7 @@ class UrlCopyError: public fts3::common::BaseException { + private: std::string scope_; std::string phase_; diff --git a/src/url-copy/UrlCopyOpts.cpp b/src/url-copy/UrlCopyOpts.cpp index c80c6ac697e1487d96934d9cea8cc7c6941d402c..266666d276938871f3e02e564534c2e27810c5e7 100644 --- a/src/url-copy/UrlCopyOpts.cpp +++ b/src/url-copy/UrlCopyOpts.cpp @@ -42,6 +42,7 @@ const option UrlCopyOpts::long_options[] = {"checksum", required_argument, 0, 300}, {"checksum-mode", required_argument, 0, 301}, {"strict-copy", no_argument, 0, 302}, + {"dst-file-report", no_argument, 0, 303}, {"token-bringonline", required_argument, 0, 400}, {"dest-token-desc", required_argument, 0, 401}, @@ -173,6 +174,7 @@ UrlCopyOpts::UrlCopyOpts(): timeout(0), enableUdt(false), enableIpv6(boost::indeterminate), addSecPerMb(0), noStreaming(false), enableMonitoring(false), active(0), retry(0), retryMax(0), logDir("/var/log/fts3"), msgDir("/var/lib/fts3"), + dst_file_report(false), debugLevel(0), logToStderr(false) { } @@ -263,6 +265,9 @@ void UrlCopyOpts::parse(int argc, char * const argv[]) case 302: strictCopy = true; break; + case 303: + dst_file_report = true; + break; case 400: referenceTransfer.tokenBringOnline = optarg; diff --git a/src/url-copy/UrlCopyOpts.h b/src/url-copy/UrlCopyOpts.h index 0d16a9ccd44430886ab12b7b7726b5d715912696..3ec9a55ed8a0bb0622ffc4dc07977407583e1819 100644 --- a/src/url-copy/UrlCopyOpts.h +++ b/src/url-copy/UrlCopyOpts.h @@ -46,6 +46,7 @@ public: bool isMultipleReplicaJob; bool strictCopy; + bool dst_file_report; std::string voName; std::string userDn; diff --git a/src/url-copy/UrlCopyProcess.cpp b/src/url-copy/UrlCopyProcess.cpp index b0c443e150e8aefbc64e9556c8069ff1b2cd809f..4927741263142a77e8bbc2220dbb290a13e25bad 100644 --- a/src/url-copy/UrlCopyProcess.cpp +++ b/src/url-copy/UrlCopyProcess.cpp @@ -29,6 +29,7 @@ #include "AutoInterruptThread.h" #include "UrlCopyProcess.h" #include "version.h" +#include "DestFile.h" using fts3::common::commit; @@ -72,6 +73,35 @@ static void setupGlobalGfal2Config(const UrlCopyOpts &opts, Gfal2 &gfal2) } } +static DestFile createDestFileReport(const Transfer &transfer, Gfal2 &gfal2, Gfal2TransferParams ¶ms) +{ + const std::string checksumType = transfer.checksumAlgorithm.empty() ? "ADLER32" : + transfer.checksumAlgorithm; + const std::string checksum = gfal2.getChecksum(transfer.destination, + transfer.checksumAlgorithm); + const uint64_t destFileSize = gfal2.stat(params, transfer.destination, false).st_size; + DestFile destFile; + destFile.fileSize = destFileSize; + destFile.checksumType = checksumType; + destFile.checksumValue = checksum; + const std::string userStatus = gfal2.getXattr(transfer.destination, GFAL_XATTR_STATUS); + if (userStatus == GFAL_XATTR_STATUS_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = false; + } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE) { + destFile.fileOnDisk = false; + destFile.fileOnTape = true; + }else if (userStatus == GFAL_XATTR_STATUS_NEARLINE_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = true; + } else if (userStatus == GFAL_XATTR_STATUS_LOST) { + destFile.fileOnDisk = false; + destFile.fileOnTape = false; + } else { + throw std::runtime_error("Failed to determine if destination file is on disk and/or tape"); + } + return destFile; +} UrlCopyProcess::UrlCopyProcess(const UrlCopyOpts &opts, Reporter &reporter): opts(opts), reporter(reporter), canceled(false), timeoutExpired(false) @@ -431,6 +461,7 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms FTS3_COMMON_LOGGER_NEWLOG(INFO) << "BDII:" << opts.infosys << commit; FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Source token issuer: " << transfer.sourceTokenIssuer << commit; FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Destination token issuer: " << transfer.destTokenIssuer << commit; + FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Report on the destination tape file: " << opts.dst_file_report << commit; if (opts.strictCopy) { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Copy only transfer!" << commit; @@ -453,8 +484,19 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms try { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking existence of destination file" << commit; gfal2.stat(params, transfer.destination, false); + if (opts.dst_file_report) { + try { + FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking integrity of destination tape file: " << + transfer.destination << commit; + auto destFile = createDestFileReport(transfer, gfal2, params); + transfer.fileMetadata = DestFile::appendDestFileToFileMetadata(transfer.fileMetadata, destFile.toJSON()); + } catch (const std::exception &ex) { + FTS3_COMMON_LOGGER_NEWLOG(ERR) << "Failed to check integrity of destination tape file: " + << transfer.destination << ": " << ex.what() << commit; + } + } throw UrlCopyError(DESTINATION, TRANSFER_PREPARATION, EEXIST, - "Destination file exists and overwrite is not enabled"); + "Destination file exists and overwrite is not enabled");; } catch (const Gfal2Exception &ex) { if (ex.code() != ENOENT) { diff --git a/src/url-copy/heuristics.cpp b/src/url-copy/heuristics.cpp index 6e5fd47c001a8928fdc24daebf73df8b27f7f6c4..2b9ecd378c0845ab430389e6fdd61a0421939b33 100644 --- a/src/url-copy/heuristics.cpp +++ b/src/url-copy/heuristics.cpp @@ -21,6 +21,7 @@ #include #include "heuristics.h" #include "common/Logger.h" +#include using namespace fts3::common; @@ -148,3 +149,24 @@ unsigned adjustTimeoutBasedOnSize(off_t sizeInBytes, const unsigned addSecPerMb) // Final timeout adjusted considering transfer timeout return 600 + ceil(timeoutPerMBLocal * (static_cast(sizeInBytes) / MB)); } + + +std::string mapErrnoToString(int err) +{ + char buf[256] = {0}; + char const *str = strerror_r(err, buf, sizeof(buf)); + if (str) { + std::string rep(str); + std::replace(rep.begin(), rep.end(), ' ', '_'); + return boost::to_upper_copy(rep); + } + return "GENERAL ERROR"; +} + + +std::string replaceMetadataString(const std::string &text) +{ + std::string copy = boost::replace_all_copy(text, "?"," "); + copy = boost::replace_all_copy(copy, "\\\"","\""); + return copy; +} diff --git a/src/url-copy/heuristics.h b/src/url-copy/heuristics.h index 60837d2eb2b16834a4b5e56fd35a6cb584a4dc75..17013aca0e069f31d68b3026bce6f2b031a3ea0f 100644 --- a/src/url-copy/heuristics.h +++ b/src/url-copy/heuristics.h @@ -34,4 +34,8 @@ bool retryTransfer(int errorNo, const std::string &category, const std::string & */ unsigned adjustTimeoutBasedOnSize(off_t sizeInBytes, unsigned addSecPerMb); +std::string mapErrnoToString(int err); + +std::string replaceMetadataString(const std::string &text); + #endif // HEURISTICS_H