From 1c5bce5997c042eb821b3306f874c0a0798b67fa Mon Sep 17 00:00:00 2001
From: Mihai Patrascoiu <mihai.patrascoiu@cern.ch>
Date: Thu, 15 Sep 2022 13:04:35 +0300
Subject: [PATCH 1/4] FTS-1783: Provide many of the Monit derived fields in the
 TransferComplete source.

The following fields are targeted:
- src_se
- dst_se
- protocol (extracted from "dst_url" first, then fallback to "src_url")
- t_final_transfer_state_flag (possible values: 1 - "Ok", 0 - "Error", -1 - "Abort")
- file_size (prepare to drop "f_size" field)
- transfer_time (transfer.end - transfer.start, in milliseconds)
- operation_time (process.end - process.start, in milliseconds)
- throughput (filesize / transfer_time, -1 for 0-size files)
- srm_preparation_time (time spent in SRM preparation, in milliseconds)
- srm_finalization_time (time spent in SRM finalization, in milliseconds)
- srm_overhead_time (sum of SRM preparation and SRM finalization)
- srm_overhead_percentage (percentage of SRM overhead from operation time)
- timestamp_checksum_src_diff (time spent getting source checksum, in milliseconds)
- timestamp_checksum_dst_diff (time spent getting destination checksum, in milliseconds)
---
 src/monitoring/msg-ifce.cpp     | 18 ++++++++++++++++++
 src/monitoring/msg-ifce.h       | 29 ++++++++++++++++++++++++-----
 src/url-copy/LegacyReporter.cpp | 26 ++++++++++++++++++++++++++
 3 files changed, 68 insertions(+), 5 deletions(-)

diff --git a/src/monitoring/msg-ifce.cpp b/src/monitoring/msg-ifce.cpp
index 7a5dab2f0..dcd2af44d 100644
--- a/src/monitoring/msg-ifce.cpp
+++ b/src/monitoring/msg-ifce.cpp
@@ -165,6 +165,9 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe
     message["dst_url"] = json::String(tr_completed.dest_url);
     message["src_hostname"] = json::String(tr_completed.source_hostname);
     message["dst_hostname"] = json::String(tr_completed.dest_hostname);
+    message["src_se"] = json::String(tr_completed.source_se);
+    message["dst_se"] = json::String(tr_completed.dest_se);
+    message["protocol"] = json::String(tr_completed.protocol);
     message["src_site_name"] = json::String(tr_completed.source_site_name);
     message["dst_site_name"] = json::String(tr_completed.dest_site_name);
     message["t_channel"] = json::String(tr_completed.t_channel);
@@ -181,12 +184,15 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe
     message["t_failure_phase"] = json::String(tr_completed.failure_phase);
     message["tr_error_category"] = json::String(tr_completed.transfer_error_category);
     message["t_final_transfer_state"] = json::String(tr_completed.final_transfer_state);
+    message["t_final_transfer_state_flag"] = json::Number(tr_completed.final_transfer_state_flag);
     message["tr_bt_transfered"] = json::Number(tr_completed.total_bytes_transferred);
     message["nstreams"] = json::Number(tr_completed.number_of_streams);
     message["buf_size"] = json::Number(tr_completed.tcp_buffer_size);
     message["tcp_buf_size"] = json::Number(tr_completed.tcp_buffer_size);
     message["block_size"] = json::Number(tr_completed.block_size);
+    // Prepare to drop "f_size" field in the future
     message["f_size"] = json::Number(tr_completed.file_size);
+    message["file_size"] = json::Number(tr_completed.file_size);
     message["time_srm_prep_st"] = json::Number(tr_completed.time_spent_in_srm_preparation_start);
     message["time_srm_prep_end"] = json::Number(tr_completed.time_spent_in_srm_preparation_end);
     message["time_srm_fin_st"] = json::Number(tr_completed.time_spent_in_srm_finalization_start);
@@ -211,6 +217,18 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe
         message["tr_timestamp_complete"] = json::Number(getTimestampMillisecs());
     }
 
+    message["transfer_time"] = json::Number(tr_completed.transfer_time_ms);
+    message["operation_time"] = json::Number(tr_completed.operation_time_ms);
+    message["throughput"] = json::Number(tr_completed.throughput_bps);
+
+    message["srm_preparation_time"] = json::Number(tr_completed.srm_preparation_time_ms);
+    message["srm_finalization_time"] = json::Number(tr_completed.srm_finalization_time_ms);
+    message["srm_overhead_time"] = json::Number(tr_completed.srm_overhead_time_ms);
+    message["srm_overhead_percentage"] = json::Number(tr_completed.srm_overhead_percentage);
+
+    message["timestamp_checksum_src_diff"] = json::Number(tr_completed.checksum_source_time_ms);
+    message["timestamp_checksum_dst_diff"] = json::Number(tr_completed.checksum_dest_time_ms);
+
     message["channel_type"] = json::String(tr_completed.channel_type);
     message["user_dn"] = json::String(tr_completed.user_dn);
 
diff --git a/src/monitoring/msg-ifce.h b/src/monitoring/msg-ifce.h
index dd527e264..a6dbb8443 100644
--- a/src/monitoring/msg-ifce.h
+++ b/src/monitoring/msg-ifce.h
@@ -38,17 +38,23 @@ public:
     TransferCompleted():
         file_id(0), timestamp_transfer_started(0), timestamp_transfer_completed(0), timestamp_checksum_source_started(0),
         timestamp_checksum_source_ended(0), timestamp_checksum_dest_started(0), timestamp_checksum_dest_ended(0),
-        transfer_timeout(0), checksum_timeout(0), transfer_error_code(0), total_bytes_transferred(0),
+        transfer_timeout(0), checksum_timeout(0), transfer_error_code(0), final_transfer_state_flag(-1),
+        total_bytes_transferred(0),
         number_of_streams(0), tcp_buffer_size(0),
-        block_size(0), file_size(0),
+        block_size(0), file_size(0), throughput_bps(0),
         time_spent_in_srm_preparation_start(0), time_spent_in_srm_preparation_end(0),
-        time_spent_in_srm_finalization_start(0), time_spent_in_srm_finalization_end(0), tr_timestamp_start(0),
-        tr_timestamp_complete(0),
+        time_spent_in_srm_finalization_start(0), time_spent_in_srm_finalization_end(0),
+        tr_timestamp_start(0), tr_timestamp_complete(0),
+        transfer_time_ms(0), operation_time_ms(0),
+        srm_preparation_time_ms(0), srm_finalization_time_ms(0),
+        srm_overhead_time_ms(0), srm_overhead_percentage(0),
+        checksum_source_time_ms(0), checksum_dest_time_ms(0),
         retry(0), retry_max(0),
         job_m_replica(false), job_multihop(false), is_lasthop(false),
         is_recoverable(false), ipv6(false), eviction_code(-1)
     {}
-    ~TransferCompleted() {}
+
+    ~TransferCompleted() = default;
 
     std::string transfer_id;
     std::string job_id;
@@ -61,6 +67,9 @@ public:
     std::string dest_url;
     std::string source_hostname;
     std::string dest_hostname;
+    std::string source_se;
+    std::string dest_se;
+    std::string protocol;
     std::string source_site_name;
     std::string dest_site_name;
     std::string t_channel;
@@ -78,11 +87,13 @@ public:
     std::string failure_phase; // (preparation, transfer, checksum, etc)
     std::string transfer_error_category; //permission, etc
     std::string final_transfer_state; //OK/Error/Abort
+    int         final_transfer_state_flag; // 1/0/-1
     off_t       total_bytes_transferred; // (this will include the info retrieved from the performance markers)
     int         number_of_streams;
     unsigned    tcp_buffer_size;
     unsigned    block_size;
     off_t       file_size;
+    double      throughput_bps;
     uint64_t    time_spent_in_srm_preparation_start;
     uint64_t    time_spent_in_srm_preparation_end;
     uint64_t    time_spent_in_srm_finalization_start;
@@ -91,6 +102,14 @@ public:
     std::string srm_space_token_dest;
     uint64_t    tr_timestamp_start;
     uint64_t    tr_timestamp_complete;
+    int64_t     transfer_time_ms;
+    int64_t     operation_time_ms;
+    int64_t     srm_preparation_time_ms;
+    int64_t     srm_finalization_time_ms;
+    uint64_t    srm_overhead_time_ms;
+    double      srm_overhead_percentage;
+    int64_t     checksum_source_time_ms;
+    int64_t     checksum_dest_time_ms;
     std::string channel_type;
     std::string user_dn;
     std::string file_metadata;
diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp
index ed7a9d531..d77012a0f 100644
--- a/src/url-copy/LegacyReporter.cpp
+++ b/src/url-copy/LegacyReporter.cpp
@@ -188,11 +188,19 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf
         completed.destination_srm_version = "2.2.0";
     }
 
+    std::string protocol = transfer.destination.protocol;
+    if (protocol.empty()) {
+        protocol = transfer.source.protocol;
+    }
+
     completed.vo = opts.voName;
     completed.source_url = transfer.source.fullUri;
     completed.dest_url = transfer.destination.fullUri;
     completed.source_hostname = transfer.source.host;
     completed.dest_hostname = transfer.destination.host;
+    completed.source_se = transfer.source.getSeName();
+    completed.dest_se = transfer.destination.getSeName();
+    completed.protocol = protocol;
     completed.t_channel = transfer.getChannel();
     completed.channel_type = "urlcopy";
     completed.user_dn = replaceMetadataString(opts.userDn);
@@ -249,6 +257,12 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf
         }
     }
 
+    if (completed.final_transfer_state == "Ok") {
+        completed.final_transfer_state_flag = 1;
+    } else if (completed.final_transfer_state == "Error") {
+        completed.final_transfer_state_flag = 0;
+    }
+
     completed.total_bytes_transferred = transfer.transferredBytes;
     completed.number_of_streams = params.getNumberOfStreams();
     completed.tcp_buffer_size = params.getTcpBuffersize();
@@ -268,6 +282,18 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf
     completed.tr_timestamp_start = transfer.stats.process.start;
     completed.tr_timestamp_complete = transfer.stats.process.end;
 
+    completed.transfer_time_ms = transfer.stats.transfer.end - transfer.stats.transfer.start;
+    completed.operation_time_ms = transfer.stats.process.end - transfer.stats.process.start;
+    completed.throughput_bps = (completed.file_size > 0) ? ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1;
+
+    completed.srm_preparation_time_ms = transfer.stats.srmPreparation.end - transfer.stats.srmPreparation.start;
+    completed.srm_finalization_time_ms = transfer.stats.srmFinalization.end - transfer.stats.srmFinalization.start;
+    completed.srm_overhead_time_ms = completed.srm_preparation_time_ms + completed.srm_finalization_time_ms;
+    completed.srm_overhead_percentage = ((double) completed.srm_overhead_time_ms * 100 / completed.operation_time_ms);
+
+    completed.checksum_source_time_ms = transfer.stats.sourceChecksum.end - transfer.stats.sourceChecksum.start;
+    completed.checksum_dest_time_ms = transfer.stats.destChecksum.end - transfer.stats.destChecksum.start;
+
     completed.ipv6 = transfer.stats.ipv6Used;
     completed.eviction_code = transfer.stats.evictionRetc;
     completed.final_destination = transfer.stats.finalDestination;
-- 
GitLab


From fcf8d0a9050941d0e19080167d3ceb79003a8120 Mon Sep 17 00:00:00 2001
From: Mihai Patrascoiu <mihai.patrascoiu@cern.ch>
Date: Thu, 15 Sep 2022 13:08:56 +0300
Subject: [PATCH 2/4] Rename confusing variable from "completed" to "started"
 in LegacyReport::sendTransferStart(..) method

---
 src/url-copy/LegacyReporter.cpp | 44 ++++++++++++++++-----------------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp
index d77012a0f..7c45c5487 100644
--- a/src/url-copy/LegacyReporter.cpp
+++ b/src/url-copy/LegacyReporter.cpp
@@ -58,37 +58,37 @@ void LegacyReporter::sendTransferStart(const Transfer &transfer, Gfal2TransferPa
 
     producer.runProducerStatus(status);
 
-    // Fill transfer start
-    TransferCompleted completed;
+    // Fill transfer started
+    TransferCompleted started;
 
-    completed.transfer_id = transfer.getTransferId();
-    completed.job_id = transfer.jobId;
-    completed.file_id = transfer.fileId;
-    completed.endpoint = opts.alias;
+    started.transfer_id = transfer.getTransferId();
+    started.job_id = transfer.jobId;
+    started.file_id = transfer.fileId;
+    started.endpoint = opts.alias;
 
     if (transfer.source.protocol == "srm") {
-        completed.source_srm_version = "2.2.0";
+        started.source_srm_version = "2.2.0";
     }
     if (transfer.destination.protocol == "srm") {
-        completed.destination_srm_version = "2.2.0";
+        started.destination_srm_version = "2.2.0";
     }
 
-    completed.vo = opts.voName;
-    completed.source_url = transfer.source.fullUri;
-    completed.dest_url = transfer.destination.fullUri;
-    completed.source_hostname = transfer.source.host;
-    completed.dest_hostname = transfer.destination.host;
-    completed.t_channel = transfer.getChannel();
-    completed.channel_type = "urlcopy";
-    completed.user_dn = replaceMetadataString(opts.userDn);
-    completed.file_metadata = replaceMetadataString(transfer.fileMetadata);
-    completed.job_metadata = replaceMetadataString(opts.jobMetadata);
-    completed.srm_space_token_source = transfer.sourceTokenDescription;
-    completed.srm_space_token_dest = transfer.destTokenDescription;
-    completed.tr_timestamp_start = millisecondsSinceEpoch();
+    started.vo = opts.voName;
+    started.source_url = transfer.source.fullUri;
+    started.dest_url = transfer.destination.fullUri;
+    started.source_hostname = transfer.source.host;
+    started.dest_hostname = transfer.destination.host;
+    started.t_channel = transfer.getChannel();
+    started.channel_type = "urlcopy";
+    started.user_dn = replaceMetadataString(opts.userDn);
+    started.file_metadata = replaceMetadataString(transfer.fileMetadata);
+    started.job_metadata = replaceMetadataString(opts.jobMetadata);
+    started.srm_space_token_source = transfer.sourceTokenDescription;
+    started.srm_space_token_dest = transfer.destTokenDescription;
+    started.tr_timestamp_start = millisecondsSinceEpoch();
 
     if (opts.enableMonitoring) {
-        std::string msgReturnValue = MsgIfce::getInstance()->SendTransferStartMessage(producer, completed);
+        std::string msgReturnValue = MsgIfce::getInstance()->SendTransferStartMessage(producer, started);
         FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Transfer start message content: " << msgReturnValue << commit;
     }
 }
-- 
GitLab


From e1659bb649020f43e27728d54f04169e486af111 Mon Sep 17 00:00:00 2001
From: Mihai Patrascoiu <mihai.patrascoiu@cern.ch>
Date: Thu, 15 Sep 2022 13:51:50 +0300
Subject: [PATCH 3/4] FTS-1783: Additional protection against division by zero
 for "throughput_bps" and "srm_overhead_percentage" fields

---
 src/url-copy/LegacyReporter.cpp | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp
index 7c45c5487..d09d9e14a 100644
--- a/src/url-copy/LegacyReporter.cpp
+++ b/src/url-copy/LegacyReporter.cpp
@@ -284,12 +284,13 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf
 
     completed.transfer_time_ms = transfer.stats.transfer.end - transfer.stats.transfer.start;
     completed.operation_time_ms = transfer.stats.process.end - transfer.stats.process.start;
-    completed.throughput_bps = (completed.file_size > 0) ? ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1;
+    completed.throughput_bps = (completed.file_size > 0 && completed.transfer_time_ms > 0) ?
+            ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1;
 
     completed.srm_preparation_time_ms = transfer.stats.srmPreparation.end - transfer.stats.srmPreparation.start;
     completed.srm_finalization_time_ms = transfer.stats.srmFinalization.end - transfer.stats.srmFinalization.start;
     completed.srm_overhead_time_ms = completed.srm_preparation_time_ms + completed.srm_finalization_time_ms;
-    completed.srm_overhead_percentage = ((double) completed.srm_overhead_time_ms * 100 / completed.operation_time_ms);
+    completed.srm_overhead_percentage = (completed.operation_time_ms > 0) ? ((double) completed.srm_overhead_time_ms * 100 / completed.operation_time_ms) : -1;
 
     completed.checksum_source_time_ms = transfer.stats.sourceChecksum.end - transfer.stats.sourceChecksum.start;
     completed.checksum_dest_time_ms = transfer.stats.destChecksum.end - transfer.stats.destChecksum.start;
-- 
GitLab


From 9b24a4055cc38d62ea3fb7b4a176c5bf913caa19 Mon Sep 17 00:00:00 2001
From: Mihai Patrascoiu <mihai.patrascoiu@cern.ch>
Date: Thu, 15 Sep 2022 13:57:16 +0300
Subject: [PATCH 4/4] FTS-1783: Allow "throughput_bps" to be zero when dealing
 with 0-size files

---
 src/url-copy/LegacyReporter.cpp | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp
index d09d9e14a..5f1bc1689 100644
--- a/src/url-copy/LegacyReporter.cpp
+++ b/src/url-copy/LegacyReporter.cpp
@@ -284,8 +284,7 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf
 
     completed.transfer_time_ms = transfer.stats.transfer.end - transfer.stats.transfer.start;
     completed.operation_time_ms = transfer.stats.process.end - transfer.stats.process.start;
-    completed.throughput_bps = (completed.file_size > 0 && completed.transfer_time_ms > 0) ?
-            ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1;
+    completed.throughput_bps = (completed.transfer_time_ms > 0) ? ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1;
 
     completed.srm_preparation_time_ms = transfer.stats.srmPreparation.end - transfer.stats.srmPreparation.start;
     completed.srm_finalization_time_ms = transfer.stats.srmFinalization.end - transfer.stats.srmFinalization.start;
-- 
GitLab