Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
FileDmaInputFilter.cc 7.36 KiB
#include "FileDmaInputFilter.h"

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#include <cassert>
#include <cerrno>
#include <cstring>
#include <sstream>
#include <system_error>

#include "log.h"

#define DAX_INPUT 1

constexpr size_t DAX_LENGTH = 16777216;
constexpr size_t DAX_OFFSET = 16777216;

FileDmaInputFilter::FileDmaInputFilter(const std::string &filename, size_t packetBufferSize,
                                       size_t nbPacketBuffers, ctrl &control)
    : InputFilter(packetBufferSize, nbPacketBuffers, control) {
#if DAX_INPUT == 1
  f_descriptor_ = open("/dev/dax0.0", O_RDONLY);
  if (f_descriptor_ == -1) {
    LOG(FATAL) << "Error opening DAX device file.";
    throw std::invalid_argument("Invalid input file name: /dev/dax0.0");
  }

  else {
    std::cout << "File descriptor = " << std::to_string(f_descriptor_) << std::endl;
  }

  uint32_t const length = DAX_LENGTH;  // Multiple of page size
  uint32_t const offset = DAX_OFFSET;
  mmap_buffer_ =
      reinterpret_cast<char *>(mmap(NULL, length, PROT_READ, MAP_SHARED, f_descriptor_, offset));

  if (mmap_buffer_ == MAP_FAILED) {
    LOG(FATAL) << "Error mapping DAX region into memory.";
    close(f_descriptor_);
    throw std::invalid_argument("Could mmap DAX region. Offset = " + std::to_string(offset) +
                                ", length = " + std::to_string(length));
  }
#else
  inputFile = fopen(filename.c_str(), "r");
  if (!inputFile) {
    throw std::invalid_argument("Invalid input file name: " + filename);
  }
#endif

  LOG(TRACE) << "Created file input filter";
}

FileDmaInputFilter::~FileDmaInputFilter() {
  fclose(inputFile);
  munmap(mmap_buffer_, DAX_LENGTH);
  LOG(TRACE) << "Destroyed file input filter";
}

/*
 * This function reads packet by packet from a file
 * in order to simulate DMA reads.
 *
 */
static inline ssize_t read_dma_packet_from_file(FILE *inputFile, char *buffer, ssize_t size,
                                                uint64_t nbReads) {
  static constexpr uint64_t deadbeef = 0xdeadbeefdeadbeefL;
  ssize_t bytesRead = 0;
  size_t rc;

  while (!feof(inputFile) && bytesRead < size) {
    // Expecting 32 byte alignment
    rc = fread(buffer, 1, 32, inputFile);

    if (ferror(inputFile)) {
      throw std::system_error(errno, std::system_category(), "File error");
    }

    if (rc != 32) {
      if (feof(inputFile) && rc == 0 && bytesRead == 0) {
        // We have reached the perfect end of the file, let's start again
        std::cout << "Reached EOF, rewinding file! It's been " << std::to_string(nbReads)
                  << " read calls." << std::endl;
        fseek(inputFile, 0, SEEK_SET);
        continue;
      }

      // Misaligned data
      throw std::runtime_error("#" + std::to_string(nbReads) +
                               ": File read ends prematurely, missing " + std::to_string(32 - rc) +
                               " bytes. Something is probably misaligned.");
    }

    bytesRead += 32;

    if (*(uint64_t *)(buffer) == deadbeef) {
      // End of the packet was found
      return bytesRead;
    }

    buffer += 32;
  }

  // We are here either
  //   because we found EOF earlier than the end of the packet
  //   or the packet cannot fit into the buffer
  // We can deal with both conditions but for the moment we throw an error

  if (feof(inputFile)) {
    throw std::runtime_error("EOF reached but no end of the packet found.");
  }

  throw std::runtime_error("Packet is too big and cannot fit into preallocated memory");
  // TODO: Read and discard data until deadbeef is found

  return -1;
}

static inline ssize_t read_dma_packet_buffer(char *src_buf, char *dst_buf, size_t size) {
  size_t bytes_read = 0;
  const size_t align_by = 32;
  assert(size % align_by == 0);
  std::cout << "read_dma_packet_buffer: reading up to " << std::to_string(size) << " bytes."
            << std::endl;

  static constexpr uint64_t deadbeef = 0xdeadbeefdeadbeefL;

  while (bytes_read < size) {
    // Expecting 32 byte alignment
    memcpy(dst_buf + bytes_read, src_buf + bytes_read, align_by);
    // Must find end of packet ('deadbeef') within the destination buffer's allocated space
    if (*(uint64_t *)(src_buf + bytes_read) == deadbeef) {
      std::cout << "Found deadbeef after " << std::to_string(bytes_read) << " bytes." << std::endl;
      return static_cast<ssize_t>(bytes_read + align_by);
    }
    bytes_read += align_by;
  }

  if (bytes_read == size) {
    std::cout << "Filled buffer but no deadbeef" << std::endl;
    //    return static_cast<ssize_t>(bytes_read);
  }

  // We are here because the packet cannot fit into the buffer
  throw std::runtime_error("Packet is too big and cannot fit into preallocated memory");
  return -1;
}

inline ssize_t FileDmaInputFilter::readPacket(char **buffer, ssize_t bufferSize) {
  // Read from DMA
  int skip = 0;
  ssize_t bytesRead = 0;

#if DAX_INPUT == 1
  // bufferSize must be multiple of page size
  std::cout << "Current mmap offset: " << std::to_string(mmap_offset_) << " out of "
            << std::to_string(DAX_LENGTH) << std::endl;

  if (mmap_offset_ >= 9050784) {
    std::cout << "Mmap offset is at EOF, we start over" << std::endl;
    mmap_offset_ = 0;
  }

  bytesRead = read_dma_packet_buffer(mmap_buffer_ + mmap_offset_, *buffer, bufferSize);
  mmap_offset_ += bytesRead;
  std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl;

  //  void *buffer = mmap(NULL, bufferSize, PROT_READ, MAP_PRIVATE, f_descriptor_, offset);
  //
  //  if (buffer == MAP_FAILED) {
  //    LOG(FATAL) << "Error mapping DAX region into memory.";
  //    close(f_descriptor_);
  //    throw std::invalid_argument("Could mmap DAX region. Offset = " + std::to_string(offset) +
  //                                ", length = " + std::to_string(bufferSize));
  //  }
  //
  // If large packet returned, skip and read again
  //  while (bytesRead > (ssize_t)bufferSize) {
  //    stats.nbOversizedPackets++;
  //    skip++;
  //    LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size
  //    "
  //               << bufferSize << ". Skipping packet #" << skip << ".";
  //    if (skip >= 100) {
  //      throw std::runtime_error("FATAL: Read is still returning large packets.");
  //    }
  //    bytesRead = read_dma_packet_buffer(f_descriptor_, *buffer, bufferSize, nbReads());
  //  }

#else
  bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads());
  std::cout << "Bytes read: " << std::to_string(bytesRead) << std::endl;

  // If large packet returned, skip and read again
  while (bytesRead > (ssize_t)bufferSize) {
    stats.nbOversizedPackets++;
    skip++;
    LOG(ERROR) << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size "
               << bufferSize << ". Skipping packet #" << skip << ".";
    if (skip >= 100) {
      throw std::runtime_error("FATAL: Read is still returning large packets.");
    }
    bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads());
  }

#endif

  return bytesRead;
}

/**************************************************************************
 * Entry points are here
 * Overriding virtual functions
 */

// Print some additional info
void FileDmaInputFilter::print(std::ostream &out) const {
  out << ", oversized packets " << stats.nbOversizedPackets;
}

ssize_t FileDmaInputFilter::readInput(char **buffer, size_t bufferSize) {
  return readPacket(buffer, static_cast<ssize_t>(bufferSize));
}