An error occurred while loading the file. Please try again.
-
Giovanna Lazzari Miotto authoredGiovanna Lazzari Miotto authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
FileDmaInputFilter.cc 7.36 KiB
#include "FileDmaInputFilter.h"
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <cassert>
#include <cerrno>
#include <cstring>
#include <sstream>
#include <system_error>
#include "log.h"
#define DAX_INPUT 1
constexpr size_t DAX_LENGTH = 16777216;
constexpr size_t DAX_OFFSET = 16777216;
FileDmaInputFilter::FileDmaInputFilter(const std::string &filename, size_t packetBufferSize,
size_t nbPacketBuffers, ctrl &control)
: InputFilter(packetBufferSize, nbPacketBuffers, control) {
#if DAX_INPUT == 1
f_descriptor_ = open("/dev/dax0.0", O_RDONLY);
if (f_descriptor_ == -1) {
LOG(FATAL) << "Error opening DAX device file.";
throw std::invalid_argument("Invalid input file name: /dev/dax0.0");
}
else {
std::cout << "File descriptor = " << std::to_string(f_descriptor_) << std::endl;
}
uint32_t const length = DAX_LENGTH; // Multiple of page size
uint32_t const offset = DAX_OFFSET;
mmap_buffer_ =
reinterpret_cast<char *>(mmap(NULL, length, PROT_READ, MAP_SHARED, f_descriptor_, offset));
if (mmap_buffer_ == MAP_FAILED) {
LOG(FATAL) << "Error mapping DAX region into memory.";
close(f_descriptor_);
throw std::invalid_argument("Could mmap DAX region. Offset = " + std::to_string(offset) +
", length = " + std::to_string(length));
}
#else
inputFile = fopen(filename.c_str(), "r");
if (!inputFile) {
throw std::invalid_argument("Invalid input file name: " + filename);
}
#endif
LOG(TRACE) << "Created file input filter";
}
FileDmaInputFilter::~FileDmaInputFilter() {
fclose(inputFile);
munmap(mmap_buffer_, DAX_LENGTH);
LOG(TRACE) << "Destroyed file input filter";
}
/*
* This function reads packet by packet from a file
* in order to simulate DMA reads.
*
*/
static inline ssize_t read_dma_packet_from_file(FILE *inputFile, char *buffer, ssize_t size,
uint64_t nbReads) {
static constexpr uint64_t deadbeef = 0xdeadbeefdeadbeefL;
ssize_t bytesRead = 0;
size_t rc;
while (!feof(inputFile) && bytesRead < size) {
// Expecting 32 byte alignment
rc = fread(buffer, 1, 32, inputFile);
if (ferror(inputFile)) {
throw std::system_error(errno, std::system_category(), "File error");
}
if (rc != 32) {
if (feof(inputFile) && rc == 0 && bytesRead == 0) {
// We have reached the perfect end of the file, let's start again
std::cout << "Reached EOF, rewinding file! It's been " << std::to_string(nbReads)
<< " read calls." << std::endl;
fseek(inputFile, 0, SEEK_SET);
continue;
}
// Misaligned data
throw std::runtime_error("#" + std::to_string(nbReads) +
": File read ends prematurely, missing " + std::to_string(32 - rc) +
" bytes. Something is probably misaligned.");
}
bytesRead += 32;
if (*(uint64_t *)(buffer) == deadbeef) {
// End of the packet was found
return bytesRead;
}
buffer += 32;
}
// We are here either
// because we found EOF earlier than the end of the packet
// or the packet cannot fit into the buffer
// We can deal with both conditions but for the moment we throw an error
if (feof(inputFile)) {
throw std::runtime_error("EOF reached but no end of the packet found.");
}
throw std::runtime_error("Packet is too big and cannot fit into preallocated memory");
// TODO: Read and discard data until deadbeef is found
return -1;
}
static inline ssize_t read_dma_packet_buffer(char *src_buf, char *dst_buf, size_t size) {
size_t bytes_read = 0;
const size_t align_by = 32;
assert(size % align_by == 0);
std::cout << "read_dma_packet_buffer: reading up to " << std::to_string(size) << " bytes."
<< std::endl;
static constexpr uint64_t deadbeef = 0xdeadbeefdeadbeefL;
while (bytes_read < size) {
// Expecting 32 byte alignment
memcpy(dst_buf + bytes_read, src_buf + bytes_read, align_by);
// Must find end of packet ('deadbeef') within the destination buffer's allocated space
if (*(uint64_t *)(src_buf + bytes_read) == deadbeef) {
std::cout << "Found deadbeef after " << std::to_string(bytes_read) << " bytes." << std::endl;
return static_cast<ssize_t>(bytes_read + align_by);
}
bytes_read += align_by;
}
if (bytes_read == size) {
std::cout << "Filled buffer but no deadbeef" << std::endl;
// return static_cast<ssize_t>(bytes_read);
}
// We are here because the packet cannot fit into the buffer
throw std::runtime_error("Packet is too big and cannot fit into preallocated memory");
return -1;
}
inline ssize_t FileDmaInputFilter::readPacket(char **buffer, ssize_t bufferSize) {
// Read from DMA
int skip = 0;
ssize_t bytesRead = 0;
#if DAX_INPUT == 1
// bufferSize must be multiple of page size
std::cout << "Current mmap offset: " << std::to_string(mmap_offset_) << " out of "
<< std::to_string(DAX_LENGTH) << std::endl;
if (mmap_offset_ >= 9050784) {
std::cout << "Mmap offset is at EOF, we start over" << std::endl;
mmap_offset_ = 0;
}
bytesRead = read_dma_packet_buffer(mmap_buffer_ + mmap_offset_, *buffer, bufferSize);
mmap_offset_ += bytesRead;
std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl;
// void *buffer = mmap(NULL, bufferSize, PROT_READ, MAP_PRIVATE, f_descriptor_, offset);
//
// if (buffer == MAP_FAILED) {
// LOG(FATAL) << "Error mapping DAX region into memory.";
// close(f_descriptor_);
// throw std::invalid_argument("Could mmap DAX region. Offset = " + std::to_string(offset) +
// ", length = " + std::to_string(bufferSize));
// }
//
// If large packet returned, skip and read again
// while (bytesRead > (ssize_t)bufferSize) {
// stats.nbOversizedPackets++;
// skip++;
// LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size
// "
// << bufferSize << ". Skipping packet #" << skip << ".";
// if (skip >= 100) {
// throw std::runtime_error("FATAL: Read is still returning large packets.");
// }
// bytesRead = read_dma_packet_buffer(f_descriptor_, *buffer, bufferSize, nbReads());
// }
#else
bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads());
std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl;
// If large packet returned, skip and read again
while (bytesRead > (ssize_t)bufferSize) {
stats.nbOversizedPackets++;
skip++;
LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size "
<< bufferSize << ". Skipping packet #" << skip << ".";
if (skip >= 100) {
throw std::runtime_error("FATAL: Read is still returning large packets.");
}
bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads());
}
#endif
return bytesRead;
}
/**************************************************************************
* Entry points are here
* Overriding virtual functions
*/
// Print some additional info
void FileDmaInputFilter::print(std::ostream &out) const {
out << ", oversized packets " << stats.nbOversizedPackets;
}
ssize_t FileDmaInputFilter::readInput(char **buffer, size_t bufferSize) {
return readPacket(buffer, static_cast<ssize_t>(bufferSize));
}