diff --git a/CMakeLists.txt b/CMakeLists.txt index d7953036f1ab7183d0e58a53f4e5bc77aaea229f..c75c6cfa4d105ba473a2bcf4b395c77ddc833e32 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ endif() tdaq_add_library(netio3-lib src/NetioSender.cpp src/NetioReceiver.cpp src/NetioPublisher.cpp src/NetioSubscriber.cpp + src/BufferFormatter.cpp LINK_LIBRARIES netio3-backend-lib NOINSTALL) @@ -60,3 +61,9 @@ add_test(netio3-unit-publisher netio3-unit-publisher) #target_compile_options(netio3-unit-sender PRIVATE "-fno-access-control") #target_link_libraries(netio3-unit-sender Catch2Main Catch2 netio3-backend-lib netio3-lib) #add_test(netio3-unit-sender netio3-unit-sender) + +tdaq_add_executable(netio3-unit-bufferformatter test/unit_tests_bufferformatter.cpp) +target_include_directories(netio3-unit-bufferformatter PRIVATE src) +target_link_libraries(netio3-unit-bufferformatter Catch2Main Catch2 netio3-backend-lib) +target_compile_options(netio3-unit-bufferformatter PRIVATE "-fno-access-control") +add_test(netio3-unit-bufferformatter netio3-unit-bufferformatter) \ No newline at end of file diff --git a/netio3/BufferFormatter.hpp b/netio3/BufferFormatter.hpp new file mode 100644 index 0000000000000000000000000000000000000000..cb110b115eaee3b249b829dc7561d343294e0732 --- /dev/null +++ b/netio3/BufferFormatter.hpp @@ -0,0 +1,130 @@ +/** + * @file BufferFormatter.hpp + * @brief BufferFormatter class, BefferMsg struct, and FormatterStatus enum. + * + * This file contains the declarations of BufferFormatter class and its functions. + * The funtions responsible for correctly formatting data to write into a buffer (NetworkBuffer). + * and also provide a way to decode the buffer. + * + */ + +#pragma once + +#include <map> +#include "netio3-backend/NetworkBuffer.hpp" + +namespace netio3{ + + /** + * @brief Return struct for the BufferFormatter::decode function. + */ + struct BufferMsg{ + const uint8_t status; + const uint64_t tag; + std::span<const uint8_t> payload; + + BufferMsg() : status(0), tag(0), payload(std::span<const uint8_t>()) {}; + + BufferMsg(uint8_t status, uint64_t tag, std::span<const uint8_t> payload) + : status(status), tag(tag), payload(payload) {} + }; + + /** + * @brief Enum class used to check the status of the write operation. + */ + enum class FormatterStatus{BUFFER_OK, BUFFER_READY, MESSAGE_TOO_BIG}; + + class BufferFormatter{ + + public: + + /** + * @brief Helper function that fills the buffer with the header. + */ + [[nodiscard]] FormatterStatus fill_header(NetworkBuffer* buf, uint8_t status, uint64_t tag, uint32_t message_size); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is the most general write function, which allows you to specify the status, tag, and data. + * data is a vector of iovec, which is a struct containing a pointer to the data and the size of the data. + * + * @param buf The pointer to the buffer. + * @param status The status value. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::vector<iovec> &data); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is the most general write function, which allows you to specify the status, tag, and data. + * data is a span of uint8_t, which is a contiguous sequence of bytes. + * + * @param buf The pointer to the buffer. + * @param status The status value. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::span<const std::uint8_t> data); + + + /** + * @brief Writes data to the buffer with the specified TAG and DATA, status is set to 0. + * + * This is a simplified write function, which allows you to specify the tag and data. + * data is a vector of iovec, which is a struct containing a pointer to the data and the size of the data. + * + * @param buf The pointer to the buffer. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint64_t tag, std::vector<iovec> &data); + + + /** + * @brief Writes data to the buffer with the specified STATUS, TAG, and DATA. + * + * This is a simplified write function, which allows you to specify the tag and data. + * data is a span of uint8_t, which is a contiguous sequence of bytes. + * + * @param buf The pointer to the buffer. + * @param tag The tag value, representing the ID of an e-link. + * @param data The data to be written. + * + * @return FormatterStatus::MESSAGE_TOO_BIG if the message is too big, FormatterStatus::BUFFER_OK otherwise. + */ + [[nodiscard]] FormatterStatus write(NetworkBuffer* buf, uint64_t tag, std::span<const std::uint8_t> data); + + + /** + * @brief Decodes the buffer and returns a vector of BufferMsg objects. + * + * @return A vector of BufferMsg objects. + */ + [[nodiscard]] static std::vector<BufferMsg> decode(std::span<const std::uint8_t> buf); + + + private: + constexpr static auto HDR_STATUS_FLAG_POS = 31; + constexpr static auto HDR_TAG_FLAG_POS = 30; + constexpr static auto NUM_BIT_PAYLOAD_SIZE = 30; + /* status size tag size */ + constexpr static auto MSG_MAX_SIZE = (1 << NUM_BIT_PAYLOAD_SIZE) - sizeof(uint8_t) - sizeof(uint64_t); + + //TODO: + //This might be changed to a single m_previous_tag, + //meaning that the association BufferFormatter to Networkbuffer will become 1:1 instead of 1:many + std::map<NetworkBuffer*, uint64_t> m_previous_tag; + }; + +} // namespace netio3 \ No newline at end of file diff --git a/src/BufferFormatter.cpp b/src/BufferFormatter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8a86129aebd747ed59f9bb2de22fcab778d22fed --- /dev/null +++ b/src/BufferFormatter.cpp @@ -0,0 +1,159 @@ +#include "netio3/BufferFormatter.hpp" +#include <cmath> + +using namespace netio3; + + +FormatterStatus BufferFormatter::fill_header(NetworkBuffer* buf, uint8_t status, uint64_t tag, uint32_t payload_size) { + if (payload_size >= MSG_MAX_SIZE) { + return FormatterStatus::MESSAGE_TOO_BIG; + } + + const bool has_status = status != 0; + const bool has_tag = not m_previous_tag.contains(buf) || tag != m_previous_tag.at(buf); + uint32_t msg_total_size = payload_size; + + if (has_status) { + msg_total_size += sizeof(status); + } + if (has_tag) { + msg_total_size += sizeof(tag); + } + if (buf->pos() + msg_total_size > buf->size()) { + return FormatterStatus::MESSAGE_TOO_BIG; + } + + const uint32_t header = msg_total_size | (has_status << HDR_STATUS_FLAG_POS) | (has_tag << HDR_TAG_FLAG_POS); + + buf->write(header); + + if (has_status) { + buf->write(status); + } + if (has_tag) { + buf->write(tag); + m_previous_tag[buf] = tag; + } + return FormatterStatus::BUFFER_OK; +} + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::vector<iovec> &data){ + uint32_t payload_size = 0; + + for (const auto& iov : data) { + payload_size += iov.iov_len; + } + + FormatterStatus buffer_status = fill_header(buf, status, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + // Fill buffer with the payload + for (const auto& iov : data) { + buf->write(iov); + } + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::span<const std::uint8_t> data){ + uint32_t payload_size = data.size(); + + FormatterStatus buffer_status = fill_header(buf, status, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + buf->write(data); + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, std::vector<iovec> &data){ + uint32_t payload_size = 0; + + for (const auto& iov : data) { + payload_size += iov.iov_len; + } + /* 0 = no status */ + FormatterStatus buffer_status = fill_header(buf, 0, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + // Fill buffer with the payload + for (const auto& iov : data) { + buf->write(iov); + } + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, std::span<const std::uint8_t> data){ + uint32_t payload_size = data.size(); + /* 0 = no status */ + FormatterStatus buffer_status = fill_header(buf, 0, tag, payload_size); + + if(buffer_status == FormatterStatus::BUFFER_OK){ + buf->write(data); + return FormatterStatus::BUFFER_OK; + } + else{ + return FormatterStatus::MESSAGE_TOO_BIG; + } +}; + + + +std::vector<BufferMsg> BufferFormatter::decode(std::span<const std::uint8_t> buf){ + //TODO: for performance reasons, consider starting with, say, 1000 elements + std::vector<BufferMsg> buffer_msgs; + std::size_t current_pos = 0; + const uint8_t* data_ptr = buf.data(); + + uint64_t previous_tag = UINT64_MAX; + while (current_pos < buf.size()) { + + uint32_t msg_header; + if(current_pos + sizeof(msg_header) > buf.size()) break; + memcpy(&msg_header, data_ptr + current_pos, sizeof(msg_header)); + current_pos += sizeof(msg_header); + + uint8_t status = 0; + if(current_pos + sizeof(status) > buf.size()) break; + if (msg_header & (1 << HDR_STATUS_FLAG_POS)) { + msg_header ^= (1 << HDR_STATUS_FLAG_POS); // Clear the most significant bit + memcpy(&status, data_ptr + current_pos, sizeof(status)); + current_pos += sizeof(status); + msg_header -= sizeof(status); + } + + uint64_t tag = previous_tag; + if(current_pos + sizeof(tag) > buf.size()) break; + if (msg_header & (1 << HDR_TAG_FLAG_POS)) { + msg_header ^= (1 << HDR_TAG_FLAG_POS); // Clear the second most significant bit + memcpy(&tag, data_ptr + current_pos, sizeof(tag)); + current_pos += sizeof(tag); + msg_header -= sizeof(tag); + previous_tag = tag; + } + + if(current_pos + msg_header > buf.size()) break; + std::span<const uint8_t> payload = buf.subspan(current_pos, msg_header); + current_pos += msg_header; + buffer_msgs.emplace_back(status, tag, payload); + } + return buffer_msgs; +}; \ No newline at end of file diff --git a/test/unit_tests_bufferformatter.cpp b/test/unit_tests_bufferformatter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..386119b7e79bf62d68a3f9d4541ea75c0c3cc850 --- /dev/null +++ b/test/unit_tests_bufferformatter.cpp @@ -0,0 +1,189 @@ +#define CATCH_CONFIG_MAIN +#define UNIT_TESTING +#include <catch2/catch_test_macros.hpp> +#include "netio3/BufferFormatter.hpp" +#include <iostream> +using namespace netio3; + +TEST_CASE("BufferFormatter::write", "[BUFFER_OK]") { + netio3::BufferFormatter formatter; + netio3::NetworkBuffer buffer(1024); + uint8_t status = 1; + uint64_t tag = 7; + std::vector<uint8_t> test_vector(40, 7); + std::vector<iovec> dataVec(40); + for (int i = 0; i < 40; i++) { + dataVec[i].iov_base = &test_vector.at(i); + dataVec[i].iov_len = 1; + } + std::span<const uint8_t> dataSpan(test_vector); + + SECTION("write with status, tag, and vector data") { + auto result = formatter.write(&buffer, status, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with tag and vector data") { + auto result = formatter.write(&buffer, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with status, tag, and span data") { + auto result = formatter.write(&buffer, status, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } + + SECTION("write with tag and span data") { + auto result = formatter.write(&buffer, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::BUFFER_OK); + buffer.reset(); + } +} + +TEST_CASE("BufferFormatter::write", "[MESSAGE_TOO_BIG]") { + netio3::BufferFormatter formatter; + netio3::NetworkBuffer buffer(40); + uint8_t status = 1; + uint64_t tag = 7; + std::vector<uint8_t> test_vector(40, 7); + std::vector<iovec> dataVec(40); + for (int i = 0; i < 40; i++) { + dataVec[i].iov_base = &test_vector.at(i); + dataVec[i].iov_len = 1; + } + std::span<const uint8_t> dataSpan(test_vector); + + SECTION("write with status, tag, and vector data: message too big") { + auto result = formatter.write(&buffer, status, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with tag and vector data: message too big") { + auto result = formatter.write(&buffer, tag, dataVec); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with status, tag, and span data: message too big") { + auto result = formatter.write(&buffer, status, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } + + SECTION("write with tag and span data: message too big") { + auto result = formatter.write(&buffer, tag, dataSpan); + REQUIRE(result == netio3::FormatterStatus::MESSAGE_TOO_BIG); + buffer.reset(); + } +} + +TEST_CASE("BufferFormatter::decode", "[DECODE]") { + + SECTION("empty buffer") { + NetworkBuffer buffer(0); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.empty()); + } + + SECTION("single message no status no tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 0, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 0); + CHECK(msgs[0].tag == 0); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message with status no tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, 0, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 1); + CHECK(msgs[0].tag == 0); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message no status with tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 0); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("single message with status and with tag") { + NetworkBuffer buffer(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer, 1, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 1); + CHECK(msgs[0].status == 1); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } + + SECTION("multiple messages") { + NetworkBuffer buffer(40); + BufferFormatter formatter; + uint32_t message1 = 0xC0FFEE; + uint32_t message2 = 0xCAFE; + std::span<const std::uint8_t> data1(reinterpret_cast<const std::uint8_t*>(&message1), sizeof(message1)); + std::span<const std::uint8_t> data2(reinterpret_cast<const std::uint8_t*>(&message2), sizeof(message2)); + REQUIRE(formatter.write(&buffer, 0, 1, data1) == FormatterStatus::BUFFER_OK); + REQUIRE(formatter.write(&buffer, 1, 0, data2) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf = buffer.data(); + auto msgs = BufferFormatter::decode(buf); + REQUIRE(msgs.size() == 2); + CHECK(msgs[0].tag == 1); + CHECK(std::memcmp(msgs[0].msg.get(), reinterpret_cast<const uint8_t*>(&message1), sizeof(message1)) == 0); + CHECK(msgs[1].status == 1); + CHECK(msgs[1].tag == 0); + CHECK(std::memcmp(msgs[1].msg.get(), reinterpret_cast<const uint8_t*>(&message2), sizeof(message2)) == 0); + } + + SECTION("multiple buffers") { + NetworkBuffer buffer1(20); + NetworkBuffer buffer2(20); + BufferFormatter formatter; + uint32_t message = 0xC0FFEE; + std::span<const std::uint8_t> data(reinterpret_cast<const std::uint8_t*>(&message), sizeof(message)); + REQUIRE(formatter.write(&buffer1, 1, 1, data) == FormatterStatus::BUFFER_OK); + REQUIRE(formatter.write(&buffer2, 1, 1, data) == FormatterStatus::BUFFER_OK); + std::span<const std::uint8_t> buf1 = buffer1.data(); + std::span<const std::uint8_t> buf2 = buffer2.data(); + auto msgs1 = BufferFormatter::decode(buf1); + auto msgs2 = BufferFormatter::decode(buf2); + REQUIRE(msgs1.size() == 1); + CHECK(msgs1[0].status == 1); + CHECK(msgs1[0].tag == 1); + CHECK(std::memcmp(msgs1[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + REQUIRE(msgs2.size() == 1); + CHECK(msgs2[0].status == 1); + CHECK(msgs2[0].tag == 1); + CHECK(std::memcmp(msgs2[0].msg.get(), reinterpret_cast<const uint8_t*>(&message), sizeof(message)) == 0); + } +} \ No newline at end of file