diff --git a/src/monitoring/msg-ifce.cpp b/src/monitoring/msg-ifce.cpp index 7a5dab2f04a537b09c7858c1e9551c7764eb8bd8..dcd2af44d74a2d4d08e3c7a0b379861f874cbc07 100644 --- a/src/monitoring/msg-ifce.cpp +++ b/src/monitoring/msg-ifce.cpp @@ -165,6 +165,9 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe message["dst_url"] = json::String(tr_completed.dest_url); message["src_hostname"] = json::String(tr_completed.source_hostname); message["dst_hostname"] = json::String(tr_completed.dest_hostname); + message["src_se"] = json::String(tr_completed.source_se); + message["dst_se"] = json::String(tr_completed.dest_se); + message["protocol"] = json::String(tr_completed.protocol); message["src_site_name"] = json::String(tr_completed.source_site_name); message["dst_site_name"] = json::String(tr_completed.dest_site_name); message["t_channel"] = json::String(tr_completed.t_channel); @@ -181,12 +184,15 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe message["t_failure_phase"] = json::String(tr_completed.failure_phase); message["tr_error_category"] = json::String(tr_completed.transfer_error_category); message["t_final_transfer_state"] = json::String(tr_completed.final_transfer_state); + message["t_final_transfer_state_flag"] = json::Number(tr_completed.final_transfer_state_flag); message["tr_bt_transfered"] = json::Number(tr_completed.total_bytes_transferred); message["nstreams"] = json::Number(tr_completed.number_of_streams); message["buf_size"] = json::Number(tr_completed.tcp_buffer_size); message["tcp_buf_size"] = json::Number(tr_completed.tcp_buffer_size); message["block_size"] = json::Number(tr_completed.block_size); + // Prepare to drop "f_size" field in the future message["f_size"] = json::Number(tr_completed.file_size); + message["file_size"] = json::Number(tr_completed.file_size); message["time_srm_prep_st"] = json::Number(tr_completed.time_spent_in_srm_preparation_start); message["time_srm_prep_end"] = json::Number(tr_completed.time_spent_in_srm_preparation_end); message["time_srm_fin_st"] = json::Number(tr_completed.time_spent_in_srm_finalization_start); @@ -211,6 +217,18 @@ std::string MsgIfce::SendTransferFinishMessage(Producer &producer, const Transfe message["tr_timestamp_complete"] = json::Number(getTimestampMillisecs()); } + message["transfer_time"] = json::Number(tr_completed.transfer_time_ms); + message["operation_time"] = json::Number(tr_completed.operation_time_ms); + message["throughput"] = json::Number(tr_completed.throughput_bps); + + message["srm_preparation_time"] = json::Number(tr_completed.srm_preparation_time_ms); + message["srm_finalization_time"] = json::Number(tr_completed.srm_finalization_time_ms); + message["srm_overhead_time"] = json::Number(tr_completed.srm_overhead_time_ms); + message["srm_overhead_percentage"] = json::Number(tr_completed.srm_overhead_percentage); + + message["timestamp_checksum_src_diff"] = json::Number(tr_completed.checksum_source_time_ms); + message["timestamp_checksum_dst_diff"] = json::Number(tr_completed.checksum_dest_time_ms); + message["channel_type"] = json::String(tr_completed.channel_type); message["user_dn"] = json::String(tr_completed.user_dn); diff --git a/src/monitoring/msg-ifce.h b/src/monitoring/msg-ifce.h index dd527e26431f4982c94075b4ec33e22198de9521..a6dbb8443db0754604ff2bf0047d94f15b901009 100644 --- a/src/monitoring/msg-ifce.h +++ b/src/monitoring/msg-ifce.h @@ -38,17 +38,23 @@ public: TransferCompleted(): file_id(0), timestamp_transfer_started(0), timestamp_transfer_completed(0), timestamp_checksum_source_started(0), timestamp_checksum_source_ended(0), timestamp_checksum_dest_started(0), timestamp_checksum_dest_ended(0), - transfer_timeout(0), checksum_timeout(0), transfer_error_code(0), total_bytes_transferred(0), + transfer_timeout(0), checksum_timeout(0), transfer_error_code(0), final_transfer_state_flag(-1), + total_bytes_transferred(0), number_of_streams(0), tcp_buffer_size(0), - block_size(0), file_size(0), + block_size(0), file_size(0), throughput_bps(0), time_spent_in_srm_preparation_start(0), time_spent_in_srm_preparation_end(0), - time_spent_in_srm_finalization_start(0), time_spent_in_srm_finalization_end(0), tr_timestamp_start(0), - tr_timestamp_complete(0), + time_spent_in_srm_finalization_start(0), time_spent_in_srm_finalization_end(0), + tr_timestamp_start(0), tr_timestamp_complete(0), + transfer_time_ms(0), operation_time_ms(0), + srm_preparation_time_ms(0), srm_finalization_time_ms(0), + srm_overhead_time_ms(0), srm_overhead_percentage(0), + checksum_source_time_ms(0), checksum_dest_time_ms(0), retry(0), retry_max(0), job_m_replica(false), job_multihop(false), is_lasthop(false), is_recoverable(false), ipv6(false), eviction_code(-1) {} - ~TransferCompleted() {} + + ~TransferCompleted() = default; std::string transfer_id; std::string job_id; @@ -61,6 +67,9 @@ public: std::string dest_url; std::string source_hostname; std::string dest_hostname; + std::string source_se; + std::string dest_se; + std::string protocol; std::string source_site_name; std::string dest_site_name; std::string t_channel; @@ -78,11 +87,13 @@ public: std::string failure_phase; // (preparation, transfer, checksum, etc) std::string transfer_error_category; //permission, etc std::string final_transfer_state; //OK/Error/Abort + int final_transfer_state_flag; // 1/0/-1 off_t total_bytes_transferred; // (this will include the info retrieved from the performance markers) int number_of_streams; unsigned tcp_buffer_size; unsigned block_size; off_t file_size; + double throughput_bps; uint64_t time_spent_in_srm_preparation_start; uint64_t time_spent_in_srm_preparation_end; uint64_t time_spent_in_srm_finalization_start; @@ -91,6 +102,14 @@ public: std::string srm_space_token_dest; uint64_t tr_timestamp_start; uint64_t tr_timestamp_complete; + int64_t transfer_time_ms; + int64_t operation_time_ms; + int64_t srm_preparation_time_ms; + int64_t srm_finalization_time_ms; + uint64_t srm_overhead_time_ms; + double srm_overhead_percentage; + int64_t checksum_source_time_ms; + int64_t checksum_dest_time_ms; std::string channel_type; std::string user_dn; std::string file_metadata; diff --git a/src/url-copy/LegacyReporter.cpp b/src/url-copy/LegacyReporter.cpp index ed7a9d5310596b44fe6aa3d92d671633ac7bbb5d..5f1bc1689916c049c4c8796bd2867b6d20bb6933 100644 --- a/src/url-copy/LegacyReporter.cpp +++ b/src/url-copy/LegacyReporter.cpp @@ -58,37 +58,37 @@ void LegacyReporter::sendTransferStart(const Transfer &transfer, Gfal2TransferPa producer.runProducerStatus(status); - // Fill transfer start - TransferCompleted completed; + // Fill transfer started + TransferCompleted started; - completed.transfer_id = transfer.getTransferId(); - completed.job_id = transfer.jobId; - completed.file_id = transfer.fileId; - completed.endpoint = opts.alias; + started.transfer_id = transfer.getTransferId(); + started.job_id = transfer.jobId; + started.file_id = transfer.fileId; + started.endpoint = opts.alias; if (transfer.source.protocol == "srm") { - completed.source_srm_version = "2.2.0"; + started.source_srm_version = "2.2.0"; } if (transfer.destination.protocol == "srm") { - completed.destination_srm_version = "2.2.0"; + started.destination_srm_version = "2.2.0"; } - completed.vo = opts.voName; - completed.source_url = transfer.source.fullUri; - completed.dest_url = transfer.destination.fullUri; - completed.source_hostname = transfer.source.host; - completed.dest_hostname = transfer.destination.host; - completed.t_channel = transfer.getChannel(); - completed.channel_type = "urlcopy"; - completed.user_dn = replaceMetadataString(opts.userDn); - completed.file_metadata = replaceMetadataString(transfer.fileMetadata); - completed.job_metadata = replaceMetadataString(opts.jobMetadata); - completed.srm_space_token_source = transfer.sourceTokenDescription; - completed.srm_space_token_dest = transfer.destTokenDescription; - completed.tr_timestamp_start = millisecondsSinceEpoch(); + started.vo = opts.voName; + started.source_url = transfer.source.fullUri; + started.dest_url = transfer.destination.fullUri; + started.source_hostname = transfer.source.host; + started.dest_hostname = transfer.destination.host; + started.t_channel = transfer.getChannel(); + started.channel_type = "urlcopy"; + started.user_dn = replaceMetadataString(opts.userDn); + started.file_metadata = replaceMetadataString(transfer.fileMetadata); + started.job_metadata = replaceMetadataString(opts.jobMetadata); + started.srm_space_token_source = transfer.sourceTokenDescription; + started.srm_space_token_dest = transfer.destTokenDescription; + started.tr_timestamp_start = millisecondsSinceEpoch(); if (opts.enableMonitoring) { - std::string msgReturnValue = MsgIfce::getInstance()->SendTransferStartMessage(producer, completed); + std::string msgReturnValue = MsgIfce::getInstance()->SendTransferStartMessage(producer, started); FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Transfer start message content: " << msgReturnValue << commit; } } @@ -188,11 +188,19 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf completed.destination_srm_version = "2.2.0"; } + std::string protocol = transfer.destination.protocol; + if (protocol.empty()) { + protocol = transfer.source.protocol; + } + completed.vo = opts.voName; completed.source_url = transfer.source.fullUri; completed.dest_url = transfer.destination.fullUri; completed.source_hostname = transfer.source.host; completed.dest_hostname = transfer.destination.host; + completed.source_se = transfer.source.getSeName(); + completed.dest_se = transfer.destination.getSeName(); + completed.protocol = protocol; completed.t_channel = transfer.getChannel(); completed.channel_type = "urlcopy"; completed.user_dn = replaceMetadataString(opts.userDn); @@ -249,6 +257,12 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf } } + if (completed.final_transfer_state == "Ok") { + completed.final_transfer_state_flag = 1; + } else if (completed.final_transfer_state == "Error") { + completed.final_transfer_state_flag = 0; + } + completed.total_bytes_transferred = transfer.transferredBytes; completed.number_of_streams = params.getNumberOfStreams(); completed.tcp_buffer_size = params.getTcpBuffersize(); @@ -268,6 +282,18 @@ void LegacyReporter::sendTransferCompleted(const Transfer &transfer, Gfal2Transf completed.tr_timestamp_start = transfer.stats.process.start; completed.tr_timestamp_complete = transfer.stats.process.end; + completed.transfer_time_ms = transfer.stats.transfer.end - transfer.stats.transfer.start; + completed.operation_time_ms = transfer.stats.process.end - transfer.stats.process.start; + completed.throughput_bps = (completed.transfer_time_ms > 0) ? ((double) completed.file_size / (completed.transfer_time_ms / 1000.0)) : -1; + + completed.srm_preparation_time_ms = transfer.stats.srmPreparation.end - transfer.stats.srmPreparation.start; + completed.srm_finalization_time_ms = transfer.stats.srmFinalization.end - transfer.stats.srmFinalization.start; + completed.srm_overhead_time_ms = completed.srm_preparation_time_ms + completed.srm_finalization_time_ms; + completed.srm_overhead_percentage = (completed.operation_time_ms > 0) ? ((double) completed.srm_overhead_time_ms * 100 / completed.operation_time_ms) : -1; + + completed.checksum_source_time_ms = transfer.stats.sourceChecksum.end - transfer.stats.sourceChecksum.start; + completed.checksum_dest_time_ms = transfer.stats.destChecksum.end - transfer.stats.destChecksum.start; + completed.ipv6 = transfer.stats.ipv6Used; completed.eviction_code = transfer.stats.evictionRetc; completed.final_destination = transfer.stats.finalDestination;