From fc79bc59f1b8ccc7ea35fb4845a76f190ee3d9c5 Mon Sep 17 00:00:00 2001 From: Mario Shehu <mario.shehu@cern.ch> Date: Mon, 27 May 2024 18:23:21 +0200 Subject: [PATCH 1/2] New NetIO buffer data format The message headers have now a new format: 1 bit Status Flag 1 bit Tag Flag 30 bits message size ----------------------------------------- The new class BufferFormatter allows to instantiate an object with it and, given a pointer to a buffer, allows to write a message in the correct format. Also it exposes a static function that allows to decode a buffer, returning a vector cointaing BufferMsg structs per message --- CMakeLists.txt | 7 ++ netio3/BufferFormatter.hpp | 131 +++++++++++++++++++ src/BufferFormatter.cpp | 161 ++++++++++++++++++++++++ test/unit_tests_bufferformatter.cpp | 189 ++++++++++++++++++++++++++++ 4 files changed, 488 insertions(+) create mode 100644 netio3/BufferFormatter.hpp create mode 100644 src/BufferFormatter.cpp create mode 100644 test/unit_tests_bufferformatter.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d795303..c75c6cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ endif() tdaq_add_library(netio3-lib src/NetioSender.cpp src/NetioReceiver.cpp src/NetioPublisher.cpp src/NetioSubscriber.cpp + src/BufferFormatter.cpp LINK_LIBRARIES netio3-backend-lib NOINSTALL) @@ -60,3 +61,9 @@ add_test(netio3-unit-publisher netio3-unit-publisher) #target_compile_options(netio3-unit-sender PRIVATE "-fno-access-control") #target_link_libraries(netio3-unit-sender Catch2Main Catch2 netio3-backend-lib netio3-lib) #add_test(netio3-unit-sender netio3-unit-sender) + +tdaq_add_executable(netio3-unit-bufferformatter test/unit_tests_bufferformatter.cpp) +target_include_directories(netio3-unit-bufferformatter PRIVATE src) +target_link_libraries(netio3-unit-bufferformatter Catch2Main Catch2 netio3-backend-lib) +target_compile_options(netio3-unit-bufferformatter PRIVATE "-fno-access-control") +add_test(netio3-unit-bufferformatter netio3-unit-bufferformatter) \ No newline at end of file diff --git a/netio3/BufferFormatter.hpp b/netio3/BufferFormatter.hpp new file mode 100644 index 0000000..c37f3e9 --- /dev/null +++ b/netio3/BufferFormatter.hpp @@ -0,0 +1,131 @@ +/** + * @file BufferFormatter.hpp + * @brief BufferFormatter class, BefferMsg struct, and FormatterStatus enum. + * + * This file contains the declarations of BufferFormatter class and its functions. + * The funtions responsible for correctly formatting data to write into a buffer (NetworkBuffer). + * and also provide a way to decode the buffer. + * + */ + +#pragma once + +#include <map> +#include "netio3-backend/NetworkBuffer.hpp" + +namespace netio3{ + + /** + * @brief Return struct for the BufferFormatter::decode function. + */ + struct BufferMsg{ + const uint8_t status; + const uint64_t tag; + std::unique_ptr<uint8_t[]> msg; + const uint32_t msg_size; + + BufferMsg() : status(0), tag(0), msg(nullptr), msg_size(0) {}; + + BufferMsg(uint8_t status, uint64_t tag, std::unique_ptr<uint8_t[]> msg, uint32_t msg_size) + : status(status), tag(tag), msg(std::move(msg)), msg_size(msg_size) {} + }; + + /** + * @brief Enum class used to check the status of the write operation. + */ + enum class FormatterStatus{BUFFER_OK, BUFFER_READY, MESSAGE_TOO_BIG}; + + class BufferFormatter{ + + public: + + /** + * @brief Helper function that fills the buffer with the header. + */ + [[nodiscard]] FormatterStatus fill_header(NetworkBuffer* buf, uint8_t status, uint64_t tag, uint32_t message_size); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is the most general write function, which allows you to specify the status, tag, and data. + * data is a vector of iovec, which is a struct containing a pointer to the data and the size of the data. + * + * @param buf The pointer to the buffer. + * @param status The status value. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::vector<iovec> &data); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is the most general write function, which allows you to specify the status, tag, and data. + * data is a span of uint8_t, which is a contiguous sequence of bytes. + * + * @param buf The pointer to the buffer. + * @param status The status value. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::span<const std::uint8_t> data); + + + /** + * @brief Writes data to the buffer with the specified TAG and DATA, status is set to 0. + * + * This is a simplified write function, which allows you to specify the tag and data. + * data is a vector of iovec, which is a struct containing a pointer to the data and the size of the data. + * + * @param buf The pointer to the buffer. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint64_t tag, std::vector<iovec> &data); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is a simplified write function, which allows you to specify the tag and data. + * data is a span of uint8_t, which is a contiguous sequence of bytes. + * + * @param buf The pointer to the buffer. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint64_t tag, std::span<const std::uint8_t> data); + + + /** + * @brief Decodes the buffer and returns a vector of BufferMsg objects. + * + * @return A vector of BufferMsg objects. + */ + [[nodiscard]] static std::vector<BufferMsg> decode(std::span<const std::uint8_t> buf); + + + private: + constexpr static auto HDR_STATUS_FLAG_POS = 31; + constexpr static auto HDR_TAG_FLAG_POS = 30; + constexpr static auto NUM_BIT_PAYLOAD_SIZE = 30; + /* status size tag size */ + constexpr static auto MSG_MAX_SIZE = (1 << NUM_BIT_PAYLOAD_SIZE) - sizeof(uint8_t) - sizeof(uint64_t); + + //TODO: + //This might be changed to a single m_previous_tag, + //meaning that the association BufferFormatter to Networkbuffer will become 1:1 instead of 1:many + std::map<NetworkBuffer*, uint64_t> m_previous_tag; + }; + +} // namespace netio3 \ No newline at end of file diff --git a/src/BufferFormatter.cpp b/src/BufferFormatter.cpp new file mode 100644 index 0000000..5ec4be0 --- /dev/null +++ b/src/BufferFormatter.cpp @@ -0,0 +1,161 @@ +#include "netio3/BufferFormatter.hpp" +#include <cmath> + +using namespace netio3; + + +FormatterStatus BufferFormatter::fill_header(NetworkBuffer* buf, uint8_t status, uint64_t tag, uint32_t payload_size) { + if (payload_size >= MSG_MAX_SIZE) { + return FormatterStatus::MESSAGE_TOO_BIG; + } + + const bool has_status = status != 0; + const bool has_tag = not m_previous_tag.contains(buf) || tag != m_previous_tag.at(buf); + uint32_t msg_total_size = payload_size; + + if (has_status) { + msg_total_size += sizeof(status); + } + if (has_tag) { + msg_total_size += sizeof(tag); + } + if (buf->pos() + msg_total_size > buf->size()) { + return FormatterStatus::MESSAGE_TOO_BIG; + } + + const uint32_t header = msg_total_size | (has_status << HDR_STATUS_FLAG_POS) | (has_tag << HDR_TAG_FLAG_POS); + + buf->write(header); + + if (has_status) { + buf->write(status); + } + if (has_tag) { + buf->write(tag); + m_previous_tag[buf] = tag; + } + return FormatterStatus::BUFFER_OK; +} + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::vector<iovec> &data){ + uint32_t payload_size = 0; + + for (const auto& iov : data) { + payload_size += iov.iov_len; + } + + FormatterStatus buffer_status = fill_header(buf, status, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + // Fill buffer with the payload + for (const auto& iov : data) { + buf->write(iov); + } + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::span<const std::uint8_t> data){ + uint32_t payload_size = data.size(); + + FormatterStatus buffer_status = fill_header(buf, status, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + buf->write(data); + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, std::vector<iovec> &data){ + uint32_t payload_size = 0; + + for (const auto& iov : data) { + payload_size += iov.iov_len; + } + /* 0 = no status */ + FormatterStatus buffer_status = fill_header(buf, 0, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + // Fill buffer with the payload + for (const auto& iov : data) { + buf->write(iov); + } + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, std::span<const std::uint8_t> data){ + uint32_t payload_size = data.size(); + /* 0 = no status */ + FormatterStatus buffer_status = fill_header(buf, 0, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + buf->write(data); + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +std::vector<BufferMsg> BufferFormatter::decode(std::span<const std::uint8_t> buf){ + //TODO: for performance reasons, consider starting with, say, 1000 elements + std::vector<BufferMsg> buffer_msgs; + std::size_t current_pos = 0; + const uint8_t* data_ptr = buf.data(); + + uint64_t previous_tag = UINT64_MAX; + while (current_pos < buf.size()) { + + uint32_t msg_header; + if(current_pos + sizeof(msg_header) > buf.size()) break; + memcpy(&msg_header, data_ptr + current_pos, sizeof(msg_header)); + current_pos += sizeof(msg_header); + + uint8_t status = 0; + if(current_pos + sizeof(status) > buf.size()) break; + if (msg_header & (1 << HDR_STATUS_FLAG_POS)) { + msg_header ^= (1 << HDR_STATUS_FLAG_POS); // Clear the most significant bit + memcpy(&status, data_ptr + current_pos, sizeof(status)); + current_pos += sizeof(status); + msg_header -= sizeof(status); + } + + uint64_t tag = previous_tag; + if(current_pos + sizeof(tag) > buf.size()) break; + if (msg_header & (1 << HDR_TAG_FLAG_POS)) { + msg_header ^= (1 << HDR_TAG_FLAG_POS); // Clear the second most significant bit + memcpy(&tag, data_ptr + current_pos, sizeof(tag)); + current_pos += sizeof(tag); + msg_header -= sizeof(tag); + previous_tag = tag; + } + + if(current_pos + msg_header > buf.size()) break; + // To avoid copying data, this might just become a pointer to the payload inside the buffer + std::unique_ptr<uint8_t[]> output_data(new uint8_t[msg_header]); + std::memcpy(output_data.get(), data_ptr + current_pos, msg_header); + current_pos += msg_header; + buffer_msgs.emplace_back(status, tag, std::move(output_data), msg_header); + } + return buffer_msgs; +}; \ No newline at end of file diff --git a/test/unit_tests_bufferformatter.cpp b/test/unit_tests_bufferformatter.cpp new file mode 100644 index 0000000..386119b --- /dev/null +++ b/test/unit_tests_bufferformatter.cpp @@ -0,0 +1,189 @@ +#define CATCH_CONFIG_MAIN +#define UNIT_TESTING +#include <catch2/catch_test_macros.hpp> +#include "netio3/BufferFormatter.hpp" +#include <iostream> +using namespace netio3; + +TEST_CASE("BufferFormatter::write", "[BUFFER_OK]") { + netio3::BufferFormatter formatter; + netio3::NetworkBuffer buffer(1024); + uint8_t status = 1; + uint64_t tag = 7; + std::vector<uint8_t> test_vector(40, 7); + std::vector<iovec> dataVec(40); + for (int i = 0; i < 40; i++) { + dataVec[i].iov_base = &test_vector.at(i); + dataVec[i].iov_len = 1; + } + std::span<const uint8_t> dataSpan(test_vector); + + SECTION("write with status, tag, and vector data") { + auto result = formatter.write(&buffer, status, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with tag and vector data") { + auto result = formatter.write(&buffer, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with status, tag, and span data") { + auto result = formatter.write(&buffer, status, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with tag and span data") { + auto result = formatter.write(&buffer, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } +} + +TEST_CASE("BufferFormatter::write", "[MESSAGE_TOO_BIG]") { + netio3::BufferFormatter formatter; + netio3::NetworkBuffer buffer(40); + uint8_t status = 1; + uint64_t tag = 7; + std::vector<uint8_t> test_vector(40, 7); + std::vector<iovec> dataVec(40); + for (int i = 0; i < 40; i++) { + dataVec[i].iov_base = &test_vector.at(i); + dataVec[i].iov_len = 1; + } + std::span<const uint8_t> dataSpan(test_vector); + + SECTION("write with status, tag, and vector data: message too big") { + auto result = formatter.write(&buffer, status, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with tag and vector data: message too big") { + auto result = formatter.write(&buffer, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with status, tag, and span data: message too big") { + auto result = formatter.write(&buffer, status, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with tag and span data: message too big") { + auto result = formatter.write(&buffer, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } +} + +TEST_CASE("BufferFormatter::decode", "[DECODE]") { + + SECTION("empty buffer") { + NetworkBuffer buffer(0); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.empty()); + } + + SECTION("single message no status no tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 0, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 0); + CHECK(msgs[0].tag == 0); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message with status no tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, 0, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 1); + CHECK(msgs[0].tag == 0); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message no status with tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 0); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message with status and with tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 1); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("multiple messages") { + NetworkBuffer buffer(40); + BufferFormatter formatter; + uint32_t message1 = 0xC0FFEE; + uint32_t message2 = 0xCAFE; + std::span<const std::uint8_t> data1(reinterpret_cast<const std::uint8_t*>(&message1), sizeof(message1)); + std::span<const std::uint8_t> data2(reinterpret_cast<const std::uint8_t*>(&message2), sizeof(message2)); + REQUIRE(formatter.write(&buffer, 0, 1, data1) == FormatterStatus::BUFFER_OK); + REQUIRE(formatter.write(&buffer, 1, 0, data2) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 2); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message1), sizeof(message1)) == 0); + CHECK(msgs[1].status == 1); + CHECK(msgs[1].tag == 0); + CHECK(std::memcmp(msgs[1].msg.get(), reinterpret_cast<const uint8_t*>(&message2), sizeof(message2)) == 0); + } + + SECTION("multiple buffers") { + NetworkBuffer buffer1(20); + NetworkBuffer buffer2(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer1, 1, 1, data) == FormatterStatus::BUFFER_OK); + REQUIRE(formatter.write(&buffer2, 1, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf1 = buffer1.data(); + std::span<const std::uint8_t> buf2 = buffer2.data(); + auto msgs1 = BufferFormatter::decode(buf1); + auto msgs2 = BufferFormatter::decode(buf2); + REQUIRE(msgs1.size() == 1); + CHECK(msgs1[0].status == 1); + CHECK(msgs1[0].tag == 1); + CHECK(std::memcmp(msgs1[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + REQUIRE(msgs2.size() == 1); + CHECK(msgs2[0].status == 1); + CHECK(msgs2[0].tag == 1); + CHECK(std::memcmp(msgs2[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } +} \ No newline at end of file -- GitLab From 79f90b3aac6aa6cb3caec8889d27a49c551ce381 Mon Sep 17 00:00:00 2001 From: Mario Shehu <mario.shehu@cern.ch> Date: Tue, 28 May 2024 15:10:50 +0200 Subject: [PATCH 2/2] BufferMsg has std::span instead of pointer and length --- netio3/BufferFormatter.hpp | 9 ++++----- src/BufferFormatter.cpp | 6 ++---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/netio3/BufferFormatter.hpp b/netio3/BufferFormatter.hpp index c37f3e9..cb110b1 100644 --- a/netio3/BufferFormatter.hpp +++ b/netio3/BufferFormatter.hpp @@ -21,13 +21,12 @@ namespace netio3{ struct BufferMsg{ const uint8_t status; const uint64_t tag; - std::unique_ptr<uint8_t[]> msg; - const uint32_t msg_size; + std::span<const uint8_t> payload; - BufferMsg() : status(0), tag(0), msg(nullptr), msg_size(0) {}; + BufferMsg() : status(0), tag(0), payload(std::span<const uint8_t>()) {}; - BufferMsg(uint8_t status, uint64_t tag, std::unique_ptr<uint8_t[]> msg, uint32_t msg_size) - : status(status), tag(tag), msg(std::move(msg)), msg_size(msg_size) {} + BufferMsg(uint8_t status, uint64_t tag, std::span<const uint8_t> payload) + : status(status), tag(tag), payload(payload) {} }; /** diff --git a/src/BufferFormatter.cpp b/src/BufferFormatter.cpp index 5ec4be0..8a86129 100644 --- a/src/BufferFormatter.cpp +++ b/src/BufferFormatter.cpp @@ -151,11 +151,9 @@ std::vector<BufferMsg> BufferFormatter::decode(std::span<const std::uint8_t> buf } if(current_pos + msg_header > buf.size()) break; - // To avoid copying data, this might just become a pointer to the payload inside the buffer - std::unique_ptr<uint8_t[]> output_data(new uint8_t[msg_header]); - std::memcpy(output_data.get(), data_ptr + current_pos, msg_header); + std::span<const uint8_t> payload = buf.subspan(current_pos, msg_header); current_pos += msg_header; - buffer_msgs.emplace_back(status, tag, std::move(output_data), msg_header); + buffer_msgs.emplace_back(status, tag, payload); } return buffer_msgs; }; \ No newline at end of file -- GitLab