From f8030aeacb45902ad0de78b7732bcfa8cd18c789 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Tue, 8 Jun 2021 14:09:36 +0200 Subject: [PATCH 01/20] FTS-1702 Added dst_file_report to UrlCopyOpts --- src/url-copy/UrlCopyOpts.cpp | 7 +++++++ src/url-copy/UrlCopyOpts.h | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/url-copy/UrlCopyOpts.cpp b/src/url-copy/UrlCopyOpts.cpp index c80c6ac69..38e178204 100644 --- a/src/url-copy/UrlCopyOpts.cpp +++ b/src/url-copy/UrlCopyOpts.cpp @@ -82,6 +82,8 @@ const option UrlCopyOpts::long_options[] = {"logDir", required_argument, 0, 900}, {"msgDir", required_argument, 0, 901}, + {"dst_file_report", no_argument, 0, 1000}, + {"help", no_argument, 0, 0}, {"debug", required_argument, 0, 1}, {"stderr", no_argument, 0, 2}, @@ -173,6 +175,7 @@ UrlCopyOpts::UrlCopyOpts(): timeout(0), enableUdt(false), enableIpv6(boost::indeterminate), addSecPerMb(0), noStreaming(false), enableMonitoring(false), active(0), retry(0), retryMax(0), logDir("/var/log/fts3"), msgDir("/var/lib/fts3"), + dst_file_report(false), debugLevel(0), logToStderr(false) { } @@ -367,6 +370,10 @@ void UrlCopyOpts::parse(int argc, char * const argv[]) msgDir = boost::lexical_cast(optarg); break; + case 1000: + dst_file_report = true; + break; + default: usage(argv[0]); } diff --git a/src/url-copy/UrlCopyOpts.h b/src/url-copy/UrlCopyOpts.h index 0d16a9ccd..842d34ee0 100644 --- a/src/url-copy/UrlCopyOpts.h +++ b/src/url-copy/UrlCopyOpts.h @@ -79,6 +79,8 @@ public: std::string logDir; std::string msgDir; + bool dst_file_report; + unsigned debugLevel; bool logToStderr; -- GitLab From f39a24f98d34a450f1f9a112ab7594b358eaa9bd Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Wed, 9 Jun 2021 10:08:20 +0200 Subject: [PATCH 02/20] FTS-1702 Added DestFile.proto to msg-bus/events --- src/msg-bus/events/DestFile.proto | 9 +++++++++ src/msg-bus/events/Message.proto | 4 ++++ 2 files changed, 13 insertions(+) create mode 100644 src/msg-bus/events/DestFile.proto diff --git a/src/msg-bus/events/DestFile.proto b/src/msg-bus/events/DestFile.proto new file mode 100644 index 000000000..697df659c --- /dev/null +++ b/src/msg-bus/events/DestFile.proto @@ -0,0 +1,9 @@ +package fts3.events; + +message DestFile { + optional uint64 file_size = 1; + optional string checksum_type = 2; + optional string checksum_value = 3; + optional bool file_on_disk = 4; + optional bool file_on_tape = 5; +} diff --git a/src/msg-bus/events/Message.proto b/src/msg-bus/events/Message.proto index 32cc2b3b0..cc404bdfe 100644 --- a/src/msg-bus/events/Message.proto +++ b/src/msg-bus/events/Message.proto @@ -1,5 +1,7 @@ package fts3.events; +import "DestFile.proto"; + message Message { required string job_id = 1; required uint64 file_id = 2; @@ -21,4 +23,6 @@ message Message { optional double throughput = 15; optional int32 errcode = 16; + + optional DestFile dest_file = 17; } -- GitLab From e4399c8c4354bc3fb8b2733cadc5f23fb942d70f Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Tue, 15 Jun 2021 09:39:31 +0200 Subject: [PATCH 03/20] FTS-1702 Added Gfal2::getChecksum() --- src/url-copy/Gfal2.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/url-copy/Gfal2.h b/src/url-copy/Gfal2.h index a96276383..8dffdf8bd 100644 --- a/src/url-copy/Gfal2.h +++ b/src/url-copy/Gfal2.h @@ -376,6 +376,16 @@ public: throw Gfal2Exception(error); } } + + /// Get the checksum of a file + std::string getChecksum(const std::string &url, const std::string &type) { + char buffer[512]; + GError *error = NULL; + if (gfal2_checksum(context, url.c_str(), type.c_str(), 0, 0, buffer, sizeof(buffer), &error)) { + throw Gfal2Exception(error); + } + return buffer; + } }; -- GitLab From 1c7cfee8c393bf8f268b67786e3c2d8598cb574a Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Tue, 15 Jun 2021 09:57:53 +0200 Subject: [PATCH 04/20] FTS-1702 Added UrlCopyError::DestFile --- src/url-copy/UrlCopyError.h | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/url-copy/UrlCopyError.h b/src/url-copy/UrlCopyError.h index 8e794a43d..7f533e616 100644 --- a/src/url-copy/UrlCopyError.h +++ b/src/url-copy/UrlCopyError.h @@ -25,6 +25,8 @@ #include "Gfal2.h" #include "heuristics.h" +#include + // ERROR SCOPE #define TRANSFER "TRANSFER" #define DESTINATION "DESTINATION" @@ -39,11 +41,22 @@ class UrlCopyError: public fts3::common::BaseException { +public: + struct DestFile { + uint64_t fileSize; + std::string checksumType; + std::string checksumValue; + bool fileOnDisk; + bool fileOnTape; + DestFile(): fileSize(0), fileOnDisk(false), fileOnTape(false) {} + }; + private: std::string scope_; std::string phase_; int code_; std::string msg_; + boost::optional destFile_; public: UrlCopyError(const std::string &scope, const std::string &phase, int code, const std::string &msg): @@ -76,6 +89,14 @@ public: bool isRecoverable(void) const throw() { return retryTransfer(code(), scope(), what()); } + + void setDestFile(const DestFile &destFile) { + destFile_ = destFile; + } + + const boost::optional &destFile() const throw() { + return destFile_; + } }; #endif // URLCOPY_ERROR_H -- GitLab From 67f17ba9247a571e572c100069cb113182890a30 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Wed, 16 Jun 2021 15:44:58 +0200 Subject: [PATCH 05/20] FTS-1702 Added Gfal2::getXattr() --- src/url-copy/Gfal2.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/url-copy/Gfal2.h b/src/url-copy/Gfal2.h index 8dffdf8bd..e1fe63f72 100644 --- a/src/url-copy/Gfal2.h +++ b/src/url-copy/Gfal2.h @@ -386,6 +386,16 @@ public: } return buffer; } + + // Get the extended attribute of a resource + std::string getXattr(const std::string &url, const std::string &name) { + char buffer[1024]; + GError *error = NULL; + if (gfal2_getxattr(context, url.c_str(), name.c_str(), buffer, sizeof(buffer), &error)) { + throw Gfal2Exception(error); + } + return buffer; + } }; -- GitLab From 1f2530da310f59f212442818d31d9cc993200dd7 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Thu, 17 Jun 2021 14:43:22 +0200 Subject: [PATCH 06/20] FTS-1702 Added UrlCopyCmd::setDstFileReport() --- src/server/services/transfers/UrlCopyCmd.cpp | 6 ++++++ src/server/services/transfers/UrlCopyCmd.h | 2 ++ 2 files changed, 8 insertions(+) diff --git a/src/server/services/transfers/UrlCopyCmd.cpp b/src/server/services/transfers/UrlCopyCmd.cpp index 7f09bd01b..7fa951f73 100644 --- a/src/server/services/transfers/UrlCopyCmd.cpp +++ b/src/server/services/transfers/UrlCopyCmd.cpp @@ -312,6 +312,12 @@ void UrlCopyCmd::setDisableStreaming(bool disable_streaming) } +void UrlCopyCmd::setDstFileReport(const bool dst_file_report) +{ + setFlag("dst_file_report", dst_file_report); +} + + int UrlCopyCmd::getBuffersize() { auto buffersize = options["tcp-buffersize"]; diff --git a/src/server/services/transfers/UrlCopyCmd.h b/src/server/services/transfers/UrlCopyCmd.h index 3a34673b6..47cafb40d 100644 --- a/src/server/services/transfers/UrlCopyCmd.h +++ b/src/server/services/transfers/UrlCopyCmd.h @@ -80,6 +80,8 @@ public: void setDisableDelegation(bool); void setDisableStreaming(bool); + void setDstFileReport(bool); + // Observers int getBuffersize(); int getNoStreams(); -- GitLab From ef7b4fce8f1d09278a81b813745961d16ca4089a Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Tue, 15 Jun 2021 16:32:52 +0200 Subject: [PATCH 07/20] FTS-1702 First attempt at executing GFAL2 stat (for file size), checksum and getAattr (for disk and tape residency) --- src/url-copy/UrlCopyProcess.cpp | 41 +++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/url-copy/UrlCopyProcess.cpp b/src/url-copy/UrlCopyProcess.cpp index b0c443e15..b967f9097 100644 --- a/src/url-copy/UrlCopyProcess.cpp +++ b/src/url-copy/UrlCopyProcess.cpp @@ -431,6 +431,7 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms FTS3_COMMON_LOGGER_NEWLOG(INFO) << "BDII:" << opts.infosys << commit; FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Source token issuer: " << transfer.sourceTokenIssuer << commit; FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Destination token issuer: " << transfer.destTokenIssuer << commit; + FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Report on the destination tape file: " << opts.dst_file_report << commit; if (opts.strictCopy) { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Copy only transfer!" << commit; @@ -452,9 +453,45 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms if (!opts.overwrite) { try { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking existence of destination file" << commit; - gfal2.stat(params, transfer.destination, false); - throw UrlCopyError(DESTINATION, TRANSFER_PREPARATION, EEXIST, + const auto destFileSize = gfal2.stat(params, transfer.destination, false).st_size; + UrlCopyError urlCopyError(DESTINATION, TRANSFER_PREPARATION, EEXIST, "Destination file exists and overwrite is not enabled"); + if (opts.dst_file_report) { + try { + FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking integrity of destination tape file: " << + transfer.destination << commit; + const std::string checksumType = transfer.checksumAlgorithm.empty() ? "ADLER32" : + transfer.checksumAlgorithm; + const std::string checksum = gfal2.getChecksum(transfer.destination, + transfer.checksumAlgorithm); + UrlCopyError::DestFile destFile; + destFile.fileSize = destFileSize; + destFile.checksumType = checksumType; + destFile.checksumValue = checksum; + const std::string userStatus = gfal2.getXattr(transfer.destination, GFAL_XATTR_STATUS); + if (userStatus == GFAL_XATTR_STATUS_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = false; + } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE) { + destFile.fileOnDisk = false; + destFile.fileOnTape = true; + } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = true; + } else if (userStatus == GFAL_XATTR_STATUS_LOST) { + destFile.fileOnDisk = false; + destFile.fileOnTape = false; + } else { + throw std::runtime_error("Failed to determine if destination file is on disk and/or tape"); + } + urlCopyError.setDestFile(destFile); + } catch (const std::exception &ex) { + FTS3_COMMON_LOGGER_NEWLOG(ERR) << "Failed to check integrity of destination tape file: " + << transfer.destination << ": " << ex.what() << commit; + } + } + + throw urlCopyError; } catch (const Gfal2Exception &ex) { if (ex.code() != ENOENT) { -- GitLab From 989aa5fe0261f9455050701bbc96513db8dd64ba Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Fri, 27 Aug 2021 11:54:58 +0200 Subject: [PATCH 08/20] FTS-1702 gfal2_getxattr error identification fix --- src/url-copy/Gfal2.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/url-copy/Gfal2.h b/src/url-copy/Gfal2.h index e1fe63f72..7659f92d1 100644 --- a/src/url-copy/Gfal2.h +++ b/src/url-copy/Gfal2.h @@ -391,7 +391,7 @@ public: std::string getXattr(const std::string &url, const std::string &name) { char buffer[1024]; GError *error = NULL; - if (gfal2_getxattr(context, url.c_str(), name.c_str(), buffer, sizeof(buffer), &error)) { + if (gfal2_getxattr(context, url.c_str(), name.c_str(), buffer, sizeof(buffer), &error) < 0) { throw Gfal2Exception(error); } return buffer; -- GitLab From ee88912d513042bdccf0f50916de7b4ae1ce2523 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Fri, 27 Aug 2021 13:55:33 +0200 Subject: [PATCH 09/20] FTS-1702 fix dst-file-report option name in fts_url_copy --- src/url-copy/UrlCopyOpts.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/url-copy/UrlCopyOpts.cpp b/src/url-copy/UrlCopyOpts.cpp index 38e178204..415829481 100644 --- a/src/url-copy/UrlCopyOpts.cpp +++ b/src/url-copy/UrlCopyOpts.cpp @@ -82,7 +82,7 @@ const option UrlCopyOpts::long_options[] = {"logDir", required_argument, 0, 900}, {"msgDir", required_argument, 0, 901}, - {"dst_file_report", no_argument, 0, 1000}, + {"dst-file-report", no_argument, 0, 1000}, {"help", no_argument, 0, 0}, {"debug", required_argument, 0, 1}, -- GitLab From 0ae7369e8f1eef52bafd90de15c384d71ae0ae95 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Fri, 27 Aug 2021 16:13:13 +0200 Subject: [PATCH 10/20] FTS-1702 Set flag dst-file-report inside UrlCopyCmd::setFromTransfer() --- src/server/services/transfers/UrlCopyCmd.cpp | 7 +------ src/server/services/transfers/UrlCopyCmd.h | 2 -- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/server/services/transfers/UrlCopyCmd.cpp b/src/server/services/transfers/UrlCopyCmd.cpp index 7fa951f73..98e0dfb97 100644 --- a/src/server/services/transfers/UrlCopyCmd.cpp +++ b/src/server/services/transfers/UrlCopyCmd.cpp @@ -203,6 +203,7 @@ void UrlCopyCmd::setFromTransfer(const TransferFile &transfer, setOption("checksum-mode", transfer.checksumMode); setOption("job-id", transfer.jobId); setFlag("overwrite", !transfer.overwriteFlag.empty()); + setFlag("dst-file-report", !transfer.dstFileReport.empty()); setOption("dest-token-desc", transfer.destinationSpaceToken); setOption("source-token-desc", transfer.sourceSpaceToken); @@ -312,12 +313,6 @@ void UrlCopyCmd::setDisableStreaming(bool disable_streaming) } -void UrlCopyCmd::setDstFileReport(const bool dst_file_report) -{ - setFlag("dst_file_report", dst_file_report); -} - - int UrlCopyCmd::getBuffersize() { auto buffersize = options["tcp-buffersize"]; diff --git a/src/server/services/transfers/UrlCopyCmd.h b/src/server/services/transfers/UrlCopyCmd.h index 47cafb40d..3a34673b6 100644 --- a/src/server/services/transfers/UrlCopyCmd.h +++ b/src/server/services/transfers/UrlCopyCmd.h @@ -80,8 +80,6 @@ public: void setDisableDelegation(bool); void setDisableStreaming(bool); - void setDstFileReport(bool); - // Observers int getBuffersize(); int getNoStreams(); -- GitLab From 753b30a8ef47373105a282726400e2e2bb448713 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Fri, 27 Aug 2021 16:27:42 +0200 Subject: [PATCH 11/20] FTS-1702 Add dstFileReport field to TransferFile.h class and map it to dst_file_report column in the database --- src/db/generic/TransferFile.h | 1 + src/db/mysql/sociConversions.h | 1 + 2 files changed, 2 insertions(+) diff --git a/src/db/generic/TransferFile.h b/src/db/generic/TransferFile.h index c5e3312f6..cccb3c215 100644 --- a/src/db/generic/TransferFile.h +++ b/src/db/generic/TransferFile.h @@ -111,6 +111,7 @@ public: time_t jobFinished; std::string voName; std::string overwriteFlag; + std::string dstFileReport; std::string userDn; std::string credId; std::string checksumMode; diff --git a/src/db/mysql/sociConversions.h b/src/db/mysql/sociConversions.h index b24751358..f66bd4083 100644 --- a/src/db/mysql/sociConversions.h +++ b/src/db/mysql/sociConversions.h @@ -144,6 +144,7 @@ struct type_conversion file.voName = v.get("vo_name"); file.fileId = v.get("file_id"); file.overwriteFlag = v.get("overwrite_flag",""); + file.dstFileReport = v.get("dst_file_report",""); file.userDn = v.get("user_dn"); file.credId = v.get("cred_id"); file.checksum = v.get("checksum",""); -- GitLab From cd10b4ec23110e3156895854a9cd378549a1c4eb Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Fri, 27 Aug 2021 16:36:54 +0200 Subject: [PATCH 12/20] FTS-1702 Get value of dst_file_report flag from the database in MySqlAPI::getReadyTransfers() and MySqlAPI::getReadySessionReuseTransfers() --- src/db/mysql/MySqlAPI.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/mysql/MySqlAPI.cpp b/src/db/mysql/MySqlAPI.cpp index a4e119f99..aa55db438 100644 --- a/src/db/mysql/MySqlAPI.cpp +++ b/src/db/mysql/MySqlAPI.cpp @@ -659,7 +659,7 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, { soci::rowset rs = (sql.prepare << " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id, " + " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id, " " f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " @@ -723,7 +723,7 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, if (it_act->second == 0) continue; std::string select = " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id," + " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id," " f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " @@ -1137,7 +1137,7 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, sql.prepare << " SELECT SQL_NO_CACHE " " f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.user_dn, j.cred_id, " + " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id, " " f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, " -- GitLab From b5bf849df3f76ffc097a4caa5bbbee34a1f1942c Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Wed, 1 Sep 2021 15:55:49 +0200 Subject: [PATCH 13/20] FTS-1702 Send destination file report from fts_url_copy to fts server --- src/url-copy/LegacyReporter.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index ccf54c27b..c020e34b7 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -169,6 +169,13 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } else { status.set_transfer_status("FAILED"); + if (const boost::optional destFile = transfer.error->destFile()){ + status.mutable_dest_file()->set_file_size(destFile->fileSize); + status.mutable_dest_file()->set_checksum_type(destFile->checksumType); + status.mutable_dest_file()->set_checksum_value(destFile->checksumValue); + status.mutable_dest_file()->set_file_on_disk(destFile->fileOnDisk); + status.mutable_dest_file()->set_file_on_tape(destFile->fileOnTape); + } } status.set_transfer_message(fullErrMsg.str()); status.set_retry(transfer.error->isRecoverable()); -- GitLab From 6ceb072d70180bee82793fbad997933f27ff8741 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Mon, 6 Sep 2021 13:44:38 +0200 Subject: [PATCH 14/20] FTS-1702 Move DestFile struct to a separate class --- src/url-copy/DestFile.h | 53 +++++++++++++++++++++++++++++++++ src/url-copy/UrlCopyError.h | 18 ----------- src/url-copy/UrlCopyProcess.cpp | 5 ++-- 3 files changed, 56 insertions(+), 20 deletions(-) create mode 100644 src/url-copy/DestFile.h diff --git a/src/url-copy/DestFile.h b/src/url-copy/DestFile.h new file mode 100644 index 000000000..92abf9410 --- /dev/null +++ b/src/url-copy/DestFile.h @@ -0,0 +1,53 @@ +#include +#include +#include +#include "heuristics.h" + + +class DestFile { + +public: + uint64_t fileSize; + std::string checksumType; + std::string checksumValue; + bool fileOnDisk; + bool fileOnTape; + + DestFile() : fileSize(0), fileOnDisk(false), fileOnTape(false) {} + + json::Object toJSON() + { + json::Object output; + + output["file_size"] = json::Number(fileSize); + output["checksum_type"] = json::String(checksumType); + output["checksum_value"] = json::String(checksumValue); + output["file_on_tape"] = json::Boolean(fileOnDisk); + output["file_on_disk"] = json::Boolean(fileOnTape); + + return output; + } + + static std::string appendDestFileToFileMetadata(std::string file_metadata, json::Object dst_file) + { + json::UnknownElement metadata; + + if (!file_metadata.empty()) { + try { + std::string new_file_metadata = replaceMetadataString(file_metadata); + std::istringstream valueStream(new_file_metadata); + json::Reader::Read(metadata, valueStream); + } + catch (...) { + metadata["file_metada"] = json::String(file_metadata); + } + } + + metadata["dst_file"] = json::Object(dst_file); + + std::ostringstream stream; + json::Writer::Write(metadata, stream); + + return stream.str(); + } +}; \ No newline at end of file diff --git a/src/url-copy/UrlCopyError.h b/src/url-copy/UrlCopyError.h index 7f533e616..d9b87faed 100644 --- a/src/url-copy/UrlCopyError.h +++ b/src/url-copy/UrlCopyError.h @@ -41,22 +41,12 @@ class UrlCopyError: public fts3::common::BaseException { -public: - struct DestFile { - uint64_t fileSize; - std::string checksumType; - std::string checksumValue; - bool fileOnDisk; - bool fileOnTape; - DestFile(): fileSize(0), fileOnDisk(false), fileOnTape(false) {} - }; private: std::string scope_; std::string phase_; int code_; std::string msg_; - boost::optional destFile_; public: UrlCopyError(const std::string &scope, const std::string &phase, int code, const std::string &msg): @@ -89,14 +79,6 @@ public: bool isRecoverable(void) const throw() { return retryTransfer(code(), scope(), what()); } - - void setDestFile(const DestFile &destFile) { - destFile_ = destFile; - } - - const boost::optional &destFile() const throw() { - return destFile_; - } }; #endif // URLCOPY_ERROR_H diff --git a/src/url-copy/UrlCopyProcess.cpp b/src/url-copy/UrlCopyProcess.cpp index b967f9097..eaef89aae 100644 --- a/src/url-copy/UrlCopyProcess.cpp +++ b/src/url-copy/UrlCopyProcess.cpp @@ -29,6 +29,7 @@ #include "AutoInterruptThread.h" #include "UrlCopyProcess.h" #include "version.h" +#include "DestFile.h" using fts3::common::commit; @@ -464,7 +465,7 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms transfer.checksumAlgorithm; const std::string checksum = gfal2.getChecksum(transfer.destination, transfer.checksumAlgorithm); - UrlCopyError::DestFile destFile; + DestFile destFile; destFile.fileSize = destFileSize; destFile.checksumType = checksumType; destFile.checksumValue = checksum; @@ -484,7 +485,7 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms } else { throw std::runtime_error("Failed to determine if destination file is on disk and/or tape"); } - urlCopyError.setDestFile(destFile); + transfer.fileMetadata = DestFile::appendDestFileToFileMetadata(transfer.fileMetadata, destFile.toJSON()); } catch (const std::exception &ex) { FTS3_COMMON_LOGGER_NEWLOG(ERR) << "Failed to check integrity of destination tape file: " << transfer.destination << ": " << ex.what() << commit; -- GitLab From 68e7da79771e6667cd416181b3c28c42bfd594fd Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Mon, 6 Sep 2021 13:57:18 +0200 Subject: [PATCH 15/20] FTS-1702 Move static methods from LegacyReporter.cpp to heuristics to make them accessible in other classes --- src/url-copy/LegacyReporter.cpp | 22 +--------------------- src/url-copy/heuristics.cpp | 22 ++++++++++++++++++++++ src/url-copy/heuristics.h | 4 ++++ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index c020e34b7..279565064 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -18,32 +18,12 @@ #include #include "common/Logger.h" #include "monitoring/msg-ifce.h" +#include "heuristics.h" namespace events = fts3::events; using fts3::common::commit; -static std::string mapErrnoToString(int err) -{ - char buf[256] = {0}; - char const *str = strerror_r(err, buf, sizeof(buf)); - if (str) { - std::string rep(str); - std::replace(rep.begin(), rep.end(), ' ', '_'); - return boost::to_upper_copy(rep); - } - return "GENERAL ERROR"; -} - - -static std::string replaceMetadataString(std::string text) -{ - text = boost::replace_all_copy(text, "?"," "); - text = boost::replace_all_copy(text, "\\\"","\""); - return text; -} - - LegacyReporter::LegacyReporter(const UrlCopyOpts &opts): producer(opts.msgDir), opts(opts), zmqContext(1), zmqPingSocket(zmqContext, ZMQ_PUB) { diff --git a/src/url-copy/heuristics.cpp b/src/url-copy/heuristics.cpp index 6e5fd47c0..db02d5c2a 100644 --- a/src/url-copy/heuristics.cpp +++ b/src/url-copy/heuristics.cpp @@ -21,6 +21,7 @@ #include #include "heuristics.h" #include "common/Logger.h" +#include using namespace fts3::common; @@ -148,3 +149,24 @@ unsigned adjustTimeoutBasedOnSize(off_t sizeInBytes, const unsigned addSecPerMb) // Final timeout adjusted considering transfer timeout return 600 + ceil(timeoutPerMBLocal * (static_cast(sizeInBytes) / MB)); } + + +std::string mapErrnoToString(int err) +{ + char buf[256] = {0}; + char const *str = strerror_r(err, buf, sizeof(buf)); + if (str) { + std::string rep(str); + std::replace(rep.begin(), rep.end(), ' ', '_'); + return boost::to_upper_copy(rep); + } + return "GENERAL ERROR"; +} + + +std::string replaceMetadataString(std::string text) +{ + text = boost::replace_all_copy(text, "?"," "); + text = boost::replace_all_copy(text, "\\\"","\""); + return text; +} diff --git a/src/url-copy/heuristics.h b/src/url-copy/heuristics.h index 60837d2eb..177672fab 100644 --- a/src/url-copy/heuristics.h +++ b/src/url-copy/heuristics.h @@ -34,4 +34,8 @@ bool retryTransfer(int errorNo, const std::string &category, const std::string & */ unsigned adjustTimeoutBasedOnSize(off_t sizeInBytes, unsigned addSecPerMb); +std::string mapErrnoToString(int err); + +std::string replaceMetadataString(std::string text); + #endif // HEURISTICS_H -- GitLab From 32a2985813f57db14eabcdd61cb57ae0e041e858 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Mon, 6 Sep 2021 14:02:18 +0200 Subject: [PATCH 16/20] FTS-1702 Send the new file metadata in a protobuf message from the from url-copy to the fts server --- src/msg-bus/events/Message.proto | 2 +- src/url-copy/LegacyReporter.cpp | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/msg-bus/events/Message.proto b/src/msg-bus/events/Message.proto index cc404bdfe..62164942f 100644 --- a/src/msg-bus/events/Message.proto +++ b/src/msg-bus/events/Message.proto @@ -24,5 +24,5 @@ message Message { optional int32 errcode = 16; - optional DestFile dest_file = 17; + optional string file_metadata = 18; } diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index 279565064..a6f0c63fc 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -149,12 +149,8 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } else { status.set_transfer_status("FAILED"); - if (const boost::optional destFile = transfer.error->destFile()){ - status.mutable_dest_file()->set_file_size(destFile->fileSize); - status.mutable_dest_file()->set_checksum_type(destFile->checksumType); - status.mutable_dest_file()->set_checksum_value(destFile->checksumValue); - status.mutable_dest_file()->set_file_on_disk(destFile->fileOnDisk); - status.mutable_dest_file()->set_file_on_tape(destFile->fileOnTape); + if (opts.dst_file_report && (!opts.overwrite)){ + status.set_file_metadata(replaceMetadataString(transfer.fileMetadata)); } } status.set_transfer_message(fullErrMsg.str()); -- GitLab From 04c860072f48eb56da6e5fa5dc89b47b366d7f32 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Mon, 6 Sep 2021 14:11:15 +0200 Subject: [PATCH 17/20] FTS-1702 Uptade file_metadata value in the DB in case a protobuf message with file_metadata field has arrived from the url-copy process --- src/db/generic/GenericDbIfce.h | 3 ++- src/db/mysql/MySqlAPI.cpp | 12 +++++++++--- src/db/mysql/MySqlAPI.h | 6 ++++-- .../services/transfers/MessageProcessingService.cpp | 4 ++-- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/db/generic/GenericDbIfce.h b/src/db/generic/GenericDbIfce.h index 6eb9ac7e0..baa79fbe0 100644 --- a/src/db/generic/GenericDbIfce.h +++ b/src/db/generic/GenericDbIfce.h @@ -113,13 +113,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy + /// @param FileMetadata The new file metadata in case it needs to be updated /// @return (true, newState) if an updated was done into the DB, (false, oldState) otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry) = 0; + int processId, double filesize, double duration, bool retry, std::string FileMetadata = "") = 0; /// Update the status of a job /// @param jobId The job ID diff --git a/src/db/mysql/MySqlAPI.cpp b/src/db/mysql/MySqlAPI.cpp index aa55db438..bae2a7c9c 100644 --- a/src/db/mysql/MySqlAPI.cpp +++ b/src/db/mysql/MySqlAPI.cpp @@ -1179,18 +1179,19 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, boost::tuple MySqlAPI::updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry) + int processId, double filesize, double duration, bool retry, std::string FileMetadata) { soci::session sql(*connectionPool); return updateFileTransferStatusInternal(sql, throughput, jobId, fileId, - transferState, errorReason, processId, filesize, duration, retry); + transferState, errorReason, processId, filesize, duration, retry, FileMetadata); } boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci::session& sql, double throughput, std::string jobId, uint64_t fileId, std::string newFileState, std::string transferMessage, - int processId, double filesize, double duration, bool retry) + int processId, double filesize, double duration, bool retry, + std::string FileMetadata) { std::string storedState; soci::indicator destSurlUuidInd; @@ -1309,6 +1310,11 @@ boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci newFileState = "ARCHIVING"; } + if (!FileMetadata.empty()) { + query << ", file_metadata = :file_metadata"; + stmt.exchange(soci::use(FileMetadata, "file_metadata")); + } + query << " , pid = :pid, filesize = :filesize, tx_duration = :duration, throughput = :throughput, current_failures = :current_failures " "WHERE file_id = :fileId AND file_state = :oldState"; stmt.exchange(soci::use(processId, "pid")); diff --git a/src/db/mysql/MySqlAPI.h b/src/db/mysql/MySqlAPI.h index 567889900..642f1a1ff 100644 --- a/src/db/mysql/MySqlAPI.h +++ b/src/db/mysql/MySqlAPI.h @@ -73,13 +73,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy + /// @param FileMetadata The new file metadata in case it needs to be updated /// @return true if an updated was done into the DB, false otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry); + int processId, double filesize, double duration, bool retry, std::string FileMetadata = ""); /// Update the status of a job /// @param jobId The job ID @@ -344,7 +345,8 @@ private: boost::tuple updateFileTransferStatusInternal(soci::session& sql, double throughput, std::string jobId, uint64_t fileId, - std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry); + std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry, + std::string FileMetadata = ""); bool updateJobTransferStatusInternal(soci::session& sql, std::string jobId, const std::string state); diff --git a/src/server/services/transfers/MessageProcessingService.cpp b/src/server/services/transfers/MessageProcessingService.cpp index 91b35cac2..af591642e 100644 --- a/src/server/services/transfers/MessageProcessingService.cpp +++ b/src/server/services/transfers/MessageProcessingService.cpp @@ -261,8 +261,8 @@ void MessageProcessingService::performOtherMessageDbChange(const fts3::events::M boost::tuple updated = db::DBSingleton::instance() .getDBObjectInstance()->updateTransferStatus( msg.job_id(), msg.file_id(), msg.throughput(), msg.transfer_status(), - msg.transfer_message(), msg.process_id(), msg.filesize(), msg.time_in_secs(), msg.retry() - ); + msg.transfer_message(), msg.process_id(), msg.filesize(), msg.time_in_secs(), msg.retry(), + msg.file_metadata()); db::DBSingleton::instance().getDBObjectInstance()->updateJobStatus( msg.job_id(), msg.transfer_status()); -- GitLab From 3141c7e4332ade1ce8b4f2b29a310036a72757c7 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Mon, 6 Sep 2021 18:54:01 +0200 Subject: [PATCH 18/20] FTS-1702 Only archiving transfers can set dst-file-report flag in the url-copy process --- src/db/generic/TransferFile.h | 1 + src/db/mysql/MySqlAPI.cpp | 8 ++++---- src/db/mysql/sociConversions.h | 1 + src/server/services/transfers/UrlCopyCmd.cpp | 4 +++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/db/generic/TransferFile.h b/src/db/generic/TransferFile.h index cccb3c215..a71c391b6 100644 --- a/src/db/generic/TransferFile.h +++ b/src/db/generic/TransferFile.h @@ -112,6 +112,7 @@ public: std::string voName; std::string overwriteFlag; std::string dstFileReport; + int archiveTimeout; std::string userDn; std::string credId; std::string checksumMode; diff --git a/src/db/mysql/MySqlAPI.cpp b/src/db/mysql/MySqlAPI.cpp index bae2a7c9c..4b8dd9a01 100644 --- a/src/db/mysql/MySqlAPI.cpp +++ b/src/db/mysql/MySqlAPI.cpp @@ -659,7 +659,7 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, { soci::rowset rs = (sql.prepare << " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, j.cred_id, " " f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " @@ -723,7 +723,7 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, if (it_act->second == 0) continue; std::string select = " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id," + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, j.cred_id," " f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " @@ -1137,8 +1137,8 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, sql.prepare << " SELECT SQL_NO_CACHE " " f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.dst_file_report, j.user_dn, j.cred_id, " - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, " + " j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, " " f.bringonline_token, f.source_se, f.dest_se, f.selection_strategy, " diff --git a/src/db/mysql/sociConversions.h b/src/db/mysql/sociConversions.h index f66bd4083..98a8b6ea0 100644 --- a/src/db/mysql/sociConversions.h +++ b/src/db/mysql/sociConversions.h @@ -145,6 +145,7 @@ struct type_conversion file.fileId = v.get("file_id"); file.overwriteFlag = v.get("overwrite_flag",""); file.dstFileReport = v.get("dst_file_report",""); + file.archiveTimeout = v.get("archive_timeout",-1); file.userDn = v.get("user_dn"); file.credId = v.get("cred_id"); file.checksum = v.get("checksum",""); diff --git a/src/server/services/transfers/UrlCopyCmd.cpp b/src/server/services/transfers/UrlCopyCmd.cpp index 98e0dfb97..019dc1edd 100644 --- a/src/server/services/transfers/UrlCopyCmd.cpp +++ b/src/server/services/transfers/UrlCopyCmd.cpp @@ -203,7 +203,9 @@ void UrlCopyCmd::setFromTransfer(const TransferFile &transfer, setOption("checksum-mode", transfer.checksumMode); setOption("job-id", transfer.jobId); setFlag("overwrite", !transfer.overwriteFlag.empty()); - setFlag("dst-file-report", !transfer.dstFileReport.empty()); + if (transfer.archiveTimeout > 0){ + setFlag("dst-file-report", !transfer.dstFileReport.empty()); + } setOption("dest-token-desc", transfer.destinationSpaceToken); setOption("source-token-desc", transfer.sourceSpaceToken); -- GitLab From 15c464bed9e2880d288ea4b3c004a012c5bbc292 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Tue, 7 Sep 2021 10:52:01 +0200 Subject: [PATCH 19/20] FTS-1702 Make sure that url-copy only updates file metadata when destination file exists --- src/url-copy/LegacyReporter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index a6f0c63fc..ec98ceb0e 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -149,7 +149,7 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } else { status.set_transfer_status("FAILED"); - if (opts.dst_file_report && (!opts.overwrite)){ + if (transfer.error->code() == EEXIST && opts.dst_file_report && (!opts.overwrite)){ status.set_file_metadata(replaceMetadataString(transfer.fileMetadata)); } } -- GitLab From c812271d02cf6d4d543b815c990f246b5c25ba12 Mon Sep 17 00:00:00 2001 From: Joao Lopes Date: Wed, 8 Sep 2021 15:17:13 +0200 Subject: [PATCH 20/20] FTS-1702 : Changes after merge request review: - Add new static method to UrlCopyProcess.cpp that generates destination file report - Change prepareMetadataString() and replaceMetadataString() to avoid an extra string copy - Fix mysql query string and name of fileMetadata parameter - Change grouping of dst-file-report option - Fix in protobuff message Message.proto to not include DestFile.proto - And some minor fixes --- src/db/generic/GenericDbIfce.h | 4 +- src/db/mysql/MySqlAPI.cpp | 22 +++---- src/db/mysql/MySqlAPI.h | 6 +- src/msg-bus/events/DestFile.proto | 9 --- src/msg-bus/events/Message.proto | 4 +- src/server/services/transfers/UrlCopyCmd.cpp | 5 +- src/url-copy/LegacyReporter.cpp | 3 +- src/url-copy/UrlCopyError.h | 2 - src/url-copy/UrlCopyOpts.cpp | 10 ++-- src/url-copy/UrlCopyOpts.h | 3 +- src/url-copy/UrlCopyProcess.cpp | 62 +++++++++++--------- src/url-copy/heuristics.cpp | 8 +-- src/url-copy/heuristics.h | 2 +- 13 files changed, 63 insertions(+), 77 deletions(-) delete mode 100644 src/msg-bus/events/DestFile.proto diff --git a/src/db/generic/GenericDbIfce.h b/src/db/generic/GenericDbIfce.h index baa79fbe0..0df3eff4f 100644 --- a/src/db/generic/GenericDbIfce.h +++ b/src/db/generic/GenericDbIfce.h @@ -113,14 +113,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy - /// @param FileMetadata The new file metadata in case it needs to be updated + /// @param fileMetadata The new file metadata in case it needs to be updated /// @return (true, newState) if an updated was done into the DB, (false, oldState) otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry, std::string FileMetadata = "") = 0; + int processId, double filesize, double duration, bool retry, std::string fileMetadata = "") = 0; /// Update the status of a job /// @param jobId The job ID diff --git a/src/db/mysql/MySqlAPI.cpp b/src/db/mysql/MySqlAPI.cpp index 4b8dd9a01..aa97e0560 100644 --- a/src/db/mysql/MySqlAPI.cpp +++ b/src/db/mysql/MySqlAPI.cpp @@ -659,8 +659,8 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, { soci::rowset rs = (sql.prepare << " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, j.cred_id, " - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " " f.source_se, f.dest_se, f.selection_strategy, j.internal_job_params, j.job_type " @@ -723,8 +723,8 @@ void MySqlAPI::getReadyTransfers(const std::vector& queues, if (it_act->second == 0) continue; std::string select = " SELECT f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, j.cred_id," - " f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, f.bringonline_token, " " f.source_se, f.dest_se, f.selection_strategy, j.internal_job_params, j.job_type " @@ -1137,8 +1137,8 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, sql.prepare << " SELECT SQL_NO_CACHE " " f.file_state, f.source_surl, f.dest_surl, f.job_id, j.vo_name, " - " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, j.user_dn, " - " j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " + " f.file_id, j.overwrite_flag, j.archive_timeout, j.dst_file_report, " + " j.user_dn, j.cred_id, f.checksum, j.checksum_method, j.source_space_token, " " j.space_token, j.copy_pin_lifetime, j.bring_online, " " f.user_filesize, f.file_metadata, j.job_metadata, f.file_index, " " f.bringonline_token, f.source_se, f.dest_se, f.selection_strategy, " @@ -1179,11 +1179,11 @@ void MySqlAPI::getReadySessionReuseTransfers(const std::vector& queues, boost::tuple MySqlAPI::updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry, std::string FileMetadata) + int processId, double filesize, double duration, bool retry, std::string fileMetadata) { soci::session sql(*connectionPool); return updateFileTransferStatusInternal(sql, throughput, jobId, fileId, - transferState, errorReason, processId, filesize, duration, retry, FileMetadata); + transferState, errorReason, processId, filesize, duration, retry, fileMetadata); } @@ -1191,7 +1191,7 @@ boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci std::string jobId, uint64_t fileId, std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry, - std::string FileMetadata) + std::string fileMetadata) { std::string storedState; soci::indicator destSurlUuidInd; @@ -1310,9 +1310,9 @@ boost::tuple MySqlAPI::updateFileTransferStatusInternal(soci newFileState = "ARCHIVING"; } - if (!FileMetadata.empty()) { + if (!fileMetadata.empty()) { query << ", file_metadata = :file_metadata"; - stmt.exchange(soci::use(FileMetadata, "file_metadata")); + stmt.exchange(soci::use(fileMetadata, "file_metadata")); } query << " , pid = :pid, filesize = :filesize, tx_duration = :duration, throughput = :throughput, current_failures = :current_failures " diff --git a/src/db/mysql/MySqlAPI.h b/src/db/mysql/MySqlAPI.h index 642f1a1ff..0dfcf31bf 100644 --- a/src/db/mysql/MySqlAPI.h +++ b/src/db/mysql/MySqlAPI.h @@ -73,14 +73,14 @@ public: /// @param filesize Actual filesize reported by the storage /// @param duration How long (in seconds) took to transfer the file /// @param retry If the error is considered recoverable by fts_url_copy - /// @param FileMetadata The new file metadata in case it needs to be updated + /// @param fileMetadata The new file metadata in case it needs to be updated /// @return true if an updated was done into the DB, false otherwise /// (i.e. trying to set ACTIVE an already ACTIVE transfer) /// @note If jobId is empty, or if fileId is 0, then processId will be used to decide /// which transfers to update virtual boost::tuple updateTransferStatus(const std::string& jobId, uint64_t fileId, double throughput, const std::string& transferState, const std::string& errorReason, - int processId, double filesize, double duration, bool retry, std::string FileMetadata = ""); + int processId, double filesize, double duration, bool retry, std::string fileMetadata = ""); /// Update the status of a job /// @param jobId The job ID @@ -346,7 +346,7 @@ private: boost::tuple updateFileTransferStatusInternal(soci::session& sql, double throughput, std::string jobId, uint64_t fileId, std::string newFileState, std::string transferMessage, int processId, double filesize, double duration, bool retry, - std::string FileMetadata = ""); + std::string fileMetadata = ""); bool updateJobTransferStatusInternal(soci::session& sql, std::string jobId, const std::string state); diff --git a/src/msg-bus/events/DestFile.proto b/src/msg-bus/events/DestFile.proto deleted file mode 100644 index 697df659c..000000000 --- a/src/msg-bus/events/DestFile.proto +++ /dev/null @@ -1,9 +0,0 @@ -package fts3.events; - -message DestFile { - optional uint64 file_size = 1; - optional string checksum_type = 2; - optional string checksum_value = 3; - optional bool file_on_disk = 4; - optional bool file_on_tape = 5; -} diff --git a/src/msg-bus/events/Message.proto b/src/msg-bus/events/Message.proto index 62164942f..0bc34d328 100644 --- a/src/msg-bus/events/Message.proto +++ b/src/msg-bus/events/Message.proto @@ -1,7 +1,5 @@ package fts3.events; -import "DestFile.proto"; - message Message { required string job_id = 1; required uint64 file_id = 2; @@ -24,5 +22,5 @@ message Message { optional int32 errcode = 16; - optional string file_metadata = 18; + optional string file_metadata = 17; } diff --git a/src/server/services/transfers/UrlCopyCmd.cpp b/src/server/services/transfers/UrlCopyCmd.cpp index 019dc1edd..5927682a9 100644 --- a/src/server/services/transfers/UrlCopyCmd.cpp +++ b/src/server/services/transfers/UrlCopyCmd.cpp @@ -40,8 +40,7 @@ UrlCopyCmd::UrlCopyCmd() : IPv6Explicit(false) std::string UrlCopyCmd::prepareMetadataString(const std::string &text) { - std::string copy(text); - copy = boost::replace_all_copy(copy, " ", "?"); + std::string copy = boost::replace_all_copy(text, " ", "?"); copy = boost::replace_all_copy(copy, "\"", "\\\""); return copy; } @@ -203,7 +202,7 @@ void UrlCopyCmd::setFromTransfer(const TransferFile &transfer, setOption("checksum-mode", transfer.checksumMode); setOption("job-id", transfer.jobId); setFlag("overwrite", !transfer.overwriteFlag.empty()); - if (transfer.archiveTimeout > 0){ + if (transfer.archiveTimeout > 0) { setFlag("dst-file-report", !transfer.dstFileReport.empty()); } setOption("dest-token-desc", transfer.destinationSpaceToken); diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index ec98ceb0e..37462ac30 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -15,7 +15,6 @@ */ #include "LegacyReporter.h" -#include #include "common/Logger.h" #include "monitoring/msg-ifce.h" #include "heuristics.h" @@ -149,7 +148,7 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } else { status.set_transfer_status("FAILED"); - if (transfer.error->code() == EEXIST && opts.dst_file_report && (!opts.overwrite)){ + if ((transfer.error->code() == EEXIST) && (opts.dst_file_report) && (!opts.overwrite)) { status.set_file_metadata(replaceMetadataString(transfer.fileMetadata)); } } diff --git a/src/url-copy/UrlCopyError.h b/src/url-copy/UrlCopyError.h index d9b87faed..a6e408762 100644 --- a/src/url-copy/UrlCopyError.h +++ b/src/url-copy/UrlCopyError.h @@ -25,8 +25,6 @@ #include "Gfal2.h" #include "heuristics.h" -#include - // ERROR SCOPE #define TRANSFER "TRANSFER" #define DESTINATION "DESTINATION" diff --git a/src/url-copy/UrlCopyOpts.cpp b/src/url-copy/UrlCopyOpts.cpp index 415829481..266666d27 100644 --- a/src/url-copy/UrlCopyOpts.cpp +++ b/src/url-copy/UrlCopyOpts.cpp @@ -42,6 +42,7 @@ const option UrlCopyOpts::long_options[] = {"checksum", required_argument, 0, 300}, {"checksum-mode", required_argument, 0, 301}, {"strict-copy", no_argument, 0, 302}, + {"dst-file-report", no_argument, 0, 303}, {"token-bringonline", required_argument, 0, 400}, {"dest-token-desc", required_argument, 0, 401}, @@ -82,8 +83,6 @@ const option UrlCopyOpts::long_options[] = {"logDir", required_argument, 0, 900}, {"msgDir", required_argument, 0, 901}, - {"dst-file-report", no_argument, 0, 1000}, - {"help", no_argument, 0, 0}, {"debug", required_argument, 0, 1}, {"stderr", no_argument, 0, 2}, @@ -266,6 +265,9 @@ void UrlCopyOpts::parse(int argc, char * const argv[]) case 302: strictCopy = true; break; + case 303: + dst_file_report = true; + break; case 400: referenceTransfer.tokenBringOnline = optarg; @@ -370,10 +372,6 @@ void UrlCopyOpts::parse(int argc, char * const argv[]) msgDir = boost::lexical_cast(optarg); break; - case 1000: - dst_file_report = true; - break; - default: usage(argv[0]); } diff --git a/src/url-copy/UrlCopyOpts.h b/src/url-copy/UrlCopyOpts.h index 842d34ee0..3ec9a55ed 100644 --- a/src/url-copy/UrlCopyOpts.h +++ b/src/url-copy/UrlCopyOpts.h @@ -46,6 +46,7 @@ public: bool isMultipleReplicaJob; bool strictCopy; + bool dst_file_report; std::string voName; std::string userDn; @@ -79,8 +80,6 @@ public: std::string logDir; std::string msgDir; - bool dst_file_report; - unsigned debugLevel; bool logToStderr; diff --git a/src/url-copy/UrlCopyProcess.cpp b/src/url-copy/UrlCopyProcess.cpp index eaef89aae..492774126 100644 --- a/src/url-copy/UrlCopyProcess.cpp +++ b/src/url-copy/UrlCopyProcess.cpp @@ -73,6 +73,35 @@ static void setupGlobalGfal2Config(const UrlCopyOpts &opts, Gfal2 &gfal2) } } +static DestFile createDestFileReport(const Transfer &transfer, Gfal2 &gfal2, Gfal2TransferParams ¶ms) +{ + const std::string checksumType = transfer.checksumAlgorithm.empty() ? "ADLER32" : + transfer.checksumAlgorithm; + const std::string checksum = gfal2.getChecksum(transfer.destination, + transfer.checksumAlgorithm); + const uint64_t destFileSize = gfal2.stat(params, transfer.destination, false).st_size; + DestFile destFile; + destFile.fileSize = destFileSize; + destFile.checksumType = checksumType; + destFile.checksumValue = checksum; + const std::string userStatus = gfal2.getXattr(transfer.destination, GFAL_XATTR_STATUS); + if (userStatus == GFAL_XATTR_STATUS_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = false; + } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE) { + destFile.fileOnDisk = false; + destFile.fileOnTape = true; + }else if (userStatus == GFAL_XATTR_STATUS_NEARLINE_ONLINE) { + destFile.fileOnDisk = true; + destFile.fileOnTape = true; + } else if (userStatus == GFAL_XATTR_STATUS_LOST) { + destFile.fileOnDisk = false; + destFile.fileOnTape = false; + } else { + throw std::runtime_error("Failed to determine if destination file is on disk and/or tape"); + } + return destFile; +} UrlCopyProcess::UrlCopyProcess(const UrlCopyOpts &opts, Reporter &reporter): opts(opts), reporter(reporter), canceled(false), timeoutExpired(false) @@ -454,45 +483,20 @@ void UrlCopyProcess::runTransfer(Transfer &transfer, Gfal2TransferParams ¶ms if (!opts.overwrite) { try { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking existence of destination file" << commit; - const auto destFileSize = gfal2.stat(params, transfer.destination, false).st_size; - UrlCopyError urlCopyError(DESTINATION, TRANSFER_PREPARATION, EEXIST, - "Destination file exists and overwrite is not enabled"); + gfal2.stat(params, transfer.destination, false); if (opts.dst_file_report) { try { FTS3_COMMON_LOGGER_NEWLOG(INFO) << "Checking integrity of destination tape file: " << transfer.destination << commit; - const std::string checksumType = transfer.checksumAlgorithm.empty() ? "ADLER32" : - transfer.checksumAlgorithm; - const std::string checksum = gfal2.getChecksum(transfer.destination, - transfer.checksumAlgorithm); - DestFile destFile; - destFile.fileSize = destFileSize; - destFile.checksumType = checksumType; - destFile.checksumValue = checksum; - const std::string userStatus = gfal2.getXattr(transfer.destination, GFAL_XATTR_STATUS); - if (userStatus == GFAL_XATTR_STATUS_ONLINE) { - destFile.fileOnDisk = true; - destFile.fileOnTape = false; - } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE) { - destFile.fileOnDisk = false; - destFile.fileOnTape = true; - } else if (userStatus == GFAL_XATTR_STATUS_NEARLINE_ONLINE) { - destFile.fileOnDisk = true; - destFile.fileOnTape = true; - } else if (userStatus == GFAL_XATTR_STATUS_LOST) { - destFile.fileOnDisk = false; - destFile.fileOnTape = false; - } else { - throw std::runtime_error("Failed to determine if destination file is on disk and/or tape"); - } + auto destFile = createDestFileReport(transfer, gfal2, params); transfer.fileMetadata = DestFile::appendDestFileToFileMetadata(transfer.fileMetadata, destFile.toJSON()); } catch (const std::exception &ex) { FTS3_COMMON_LOGGER_NEWLOG(ERR) << "Failed to check integrity of destination tape file: " << transfer.destination << ": " << ex.what() << commit; } } - - throw urlCopyError; + throw UrlCopyError(DESTINATION, TRANSFER_PREPARATION, EEXIST, + "Destination file exists and overwrite is not enabled");; } catch (const Gfal2Exception &ex) { if (ex.code() != ENOENT) { diff --git a/src/url-copy/heuristics.cpp b/src/url-copy/heuristics.cpp index db02d5c2a..2b9ecd378 100644 --- a/src/url-copy/heuristics.cpp +++ b/src/url-copy/heuristics.cpp @@ -164,9 +164,9 @@ std::string mapErrnoToString(int err) } -std::string replaceMetadataString(std::string text) +std::string replaceMetadataString(const std::string &text) { - text = boost::replace_all_copy(text, "?"," "); - text = boost::replace_all_copy(text, "\\\"","\""); - return text; + std::string copy = boost::replace_all_copy(text, "?"," "); + copy = boost::replace_all_copy(copy, "\\\"","\""); + return copy; } diff --git a/src/url-copy/heuristics.h b/src/url-copy/heuristics.h index 177672fab..17013aca0 100644 --- a/src/url-copy/heuristics.h +++ b/src/url-copy/heuristics.h @@ -36,6 +36,6 @@ unsigned adjustTimeoutBasedOnSize(off_t sizeInBytes, unsigned addSecPerMb); std::string mapErrnoToString(int err); -std::string replaceMetadataString(std::string text); +std::string replaceMetadataString(const std::string &text); #endif // HEURISTICS_H -- GitLab