diff --git a/src/FileDmaInputFilter.cc b/src/FileDmaInputFilter.cc index 0e87762f35a6b4149d7867ba190f43d4f2290836..b340791b68d3656753699fcb61419dbe7cfeda08 100644 --- a/src/FileDmaInputFilter.cc +++ b/src/FileDmaInputFilter.cc @@ -1,60 +1,63 @@ #include "FileDmaInputFilter.h" -#include <fcntl.h> +#include <sys/fcntl.h> #include <sys/mman.h> #include <unistd.h> #include <cassert> #include <cerrno> +#include <cstdio> #include <cstring> -#include <sstream> +#include <string> #include <system_error> +#include "InputFilter.h" #include "log.h" -#define DAX_INPUT 1 +const bool USE_DAX_MEM = true; +const size_t DEV_PAGE_SIZE = 4096; // Presumably +const size_t DAX_START_OFFSET = 16777216; // Multiple of page size -- 4096 pages +constexpr size_t DAX_DATA_LENGTH = 16777216; // Note: might need to be multiple of huge page +// (((9050784 / DEV_PAGE_SIZE) + 1) * DEV_PAGE_SIZE); // 9052160 (2210 pages) -constexpr size_t DAX_LENGTH = 16777216; -constexpr size_t DAX_OFFSET = 16777216; +const std::string DAX_ADDRESS = "/dev/dax0.0"; FileDmaInputFilter::FileDmaInputFilter(const std::string &filename, size_t packetBufferSize, size_t nbPacketBuffers, ctrl &control) : InputFilter(packetBufferSize, nbPacketBuffers, control) { -#if DAX_INPUT == 1 - f_descriptor_ = open("/dev/dax0.0", O_RDONLY); - if (f_descriptor_ == -1) { - LOG(FATAL) << "Error opening DAX device file."; - throw std::invalid_argument("Invalid input file name: /dev/dax0.0"); - } - - else { - std::cout << "File descriptor = " << std::to_string(f_descriptor_) << std::endl; - } + if constexpr (USE_DAX_MEM) { + f_descriptor_ = open(DAX_ADDRESS.c_str(), O_RDONLY); + if (f_descriptor_ == -1) { + LOG(FATAL) << "Error opening DAX device file."; + throw std::invalid_argument("Invalid input file name: " + DAX_ADDRESS); + } - uint32_t const length = DAX_LENGTH; // Multiple of page size - uint32_t const offset = DAX_OFFSET; - mmap_buffer_ = - reinterpret_cast<char *>(mmap(NULL, length, PROT_READ, MAP_SHARED, f_descriptor_, offset)); + auto const dax_length = DAX_DATA_LENGTH; + auto const dax_offset = DAX_START_OFFSET; - if (mmap_buffer_ == MAP_FAILED) { - LOG(FATAL) << "Error mapping DAX region into memory."; - close(f_descriptor_); - throw std::invalid_argument("Could mmap DAX region. Offset = " + std::to_string(offset) + - ", length = " + std::to_string(length)); - } -#else - inputFile = fopen(filename.c_str(), "r"); - if (!inputFile) { - throw std::invalid_argument("Invalid input file name: " + filename); + void *addr = mmap(nullptr, dax_length, PROT_READ, MAP_SHARED, f_descriptor_, dax_offset); + if (addr == MAP_FAILED) { + LOG(FATAL) << "Error mapping DAX region (device " << DAX_ADDRESS << ", start " + << std::to_string(dax_offset) << ", length " << std::to_string(dax_length) << ")."; + close(f_descriptor_); + throw std::invalid_argument("Failed mmap of DAX device memory region"); + } + source_buffer_ = reinterpret_cast<char *>(addr); + } else { + inputFile = fopen(filename.c_str(), "r"); + if (!inputFile) { + throw std::invalid_argument("Invalid input file name: " + filename); + } } -#endif LOG(TRACE) << "Created file input filter"; } FileDmaInputFilter::~FileDmaInputFilter() { fclose(inputFile); - munmap(mmap_buffer_, DAX_LENGTH); + if constexpr (USE_DAX_MEM) { + munmap(source_buffer_, DAX_DATA_LENGTH); + } LOG(TRACE) << "Destroyed file input filter"; } @@ -148,23 +151,39 @@ static inline ssize_t read_dma_packet_buffer(char *src_buf, char *dst_buf, size_ } inline ssize_t FileDmaInputFilter::readPacket(char **buffer, ssize_t bufferSize) { - // Read from DMA + const int MAX_SKIPPED_PACKETS = 100; int skip = 0; ssize_t bytesRead = 0; -#if DAX_INPUT == 1 - // bufferSize must be multiple of page size - std::cout << "Current mmap offset: " << std::to_string(mmap_offset_) << " out of " - << std::to_string(DAX_LENGTH) << std::endl; - - if (mmap_offset_ >= 9050784) { - std::cout << "Mmap offset is at EOF, we start over" << std::endl; - mmap_offset_ = 0; + if constexpr (USE_DAX_MEM) { + // bufferSize must be multiple of page size + if (source_offset_ >= DAX_DATA_LENGTH) { + LOG(INFO) << "Reached end of data buffer content (" << DAX_DATA_LENGTH + << " bytes), starting over"; + source_offset_ = DAX_START_OFFSET; + } + bytesRead = read_dma_packet_buffer(&source_buffer_[source_offset_], *buffer, bufferSize); + source_offset_ += bytesRead; + } else { + bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); } - bytesRead = read_dma_packet_buffer(mmap_buffer_ + mmap_offset_, *buffer, bufferSize); - mmap_offset_ += bytesRead; std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl; + while (bytesRead > (ssize_t)bufferSize) { + stats.nbOversizedPackets++; + skip++; + LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size " + << bufferSize << ". Skipping packet #" << skip << "."; + if (skip >= MAX_SKIPPED_PACKETS) { + throw std::runtime_error("FATAL: Read is still returning large packets."); + } + if constexpr (USE_DAX_MEM) { + bytesRead = read_dma_packet_buffer(&source_buffer_[source_offset_], *buffer, bufferSize); + } else { + bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); + } + } + return bytesRead; // void *buffer = mmap(NULL, bufferSize, PROT_READ, MAP_PRIVATE, f_descriptor_, offset); // @@ -188,25 +207,24 @@ inline ssize_t FileDmaInputFilter::readPacket(char **buffer, ssize_t bufferSize) // bytesRead = read_dma_packet_buffer(f_descriptor_, *buffer, bufferSize, nbReads()); // } -#else - bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); - std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl; - - // If large packet returned, skip and read again - while (bytesRead > (ssize_t)bufferSize) { - stats.nbOversizedPackets++; - skip++; - LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size " - << bufferSize << ". Skipping packet #" << skip << "."; - if (skip >= 100) { - throw std::runtime_error("FATAL: Read is still returning large packets."); - } - bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); - } - -#endif - - return bytesRead; + // #else + // bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); + // std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl; + // + // // If large packet returned, skip and read again + // while (bytesRead > (ssize_t)bufferSize) { + // stats.nbOversizedPackets++; + // skip++; + // LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size + // " + // << bufferSize << ". Skipping packet #" << skip << "."; + // if (skip >= 100) { + // throw std::runtime_error("FATAL: Read is still returning large packets."); + // } + // bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads()); + // } + // + // #endif } /************************************************************************** diff --git a/src/FileDmaInputFilter.h b/src/FileDmaInputFilter.h index cd81f85e63c730069cf2cb8b69b25d76b10737be..23ad881a12878741af2175211040133d4100e73a 100644 --- a/src/FileDmaInputFilter.h +++ b/src/FileDmaInputFilter.h @@ -4,10 +4,14 @@ #include <tbb/pipeline.h> #include <tbb/tick_count.h> +#include <cstddef> +#include <cstdint> #include <memory> +#include <ostream> #include <string> #include "InputFilter.h" +#include "controls.h" class FileDmaInputFilter : public InputFilter { public: @@ -22,9 +26,8 @@ class FileDmaInputFilter : public InputFilter { private: ssize_t readPacket(char **buffer, ssize_t bufferSize); - private: - size_t mmap_offset_ = 0; - char *mmap_buffer_; + size_t source_offset_ = 0; + char *source_buffer_; FILE *inputFile; int f_descriptor_;