Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
processor.cc 16.47 KiB
#include "processor.h"

#include <algorithm>
#include <cmath>
#include <iomanip>
#include <vector>

#include "format.h"
#include "log.h"
#include "slice.h"

// Definition of the static member stats
StreamProcessor::Statistics StreamProcessor::stats;

StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_,
                                 uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_,
                                 ctrl &control_)
    : tbb::filter(parallel),
      max_size(max_size_),
      nbPackets(0),
      doZS(doZS_),
      processorType(processorType_),
      nOrbitsPerDMAPacket(nOrbitsPerDMAPacket_),
      prescaleFactor(prescaleFactor_),
      control(control_) {
  LOG(TRACE) << "Created transform filter at " << static_cast<void *>(this);
  myfile.open("example.txt");
}

BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>>
    StreamProcessor::BrilQueue;

// Loops over each word in the orbit trailer BX map and fills a vector with the
// non-empty BX values
void bit_check(std::vector<unsigned int> *bx_vect, uint32_t word, uint32_t offset) {
  for (uint32_t i = 0; i < 32; i++) {
    if (word & 1) {
      bx_vect->push_back(i + offset);
    }
    word >>= 1;
  }
  return;
}

StreamProcessor::~StreamProcessor() { myfile.close(); }

// checks that the packet size is an integer multiple of the BX block size,
// minus the header/trailers
bool StreamProcessor::CheckFrameMultBlock(size_t inputSize) {
  int bsize = sizeof(blockMuon);
  if ((inputSize - nOrbitsPerDMAPacket * constants::orbit_trailer_size - 32 * nOrbitsPerDMAPacket -
       32) %
          bsize !=
      0) {
    stats.n_consistent_sized_packets = 0;
    stats.packet_skipped_inconsistent_size++;

    if ((stats.packet_skipped_inconsistent_size++ == 1) ||
        ((stats.packet_skipped_inconsistent_size < 100) &&
         (stats.packet_skipped_inconsistent_size % 10 == 0)) ||
        ((stats.packet_skipped_inconsistent_size < 1000) &&
         (stats.packet_skipped_inconsistent_size % 100 == 0)) ||
        ((stats.packet_skipped_inconsistent_size < 10000) &&
         (stats.packet_skipped_inconsistent_size % 1000 == 0)) ||
        (stats.packet_skipped_inconsistent_size % 10000 == 0)) {
      LOG(WARNING) << "Frame size not a multiple of block size after headers "
                      "and trailers have been subtracted. Counted "
                   << stats.packet_skipped_inconsistent_size << " packets skipped.";
    }
    if (control.verbosity != 0) {
      LOG(WARNING) << "Frame size not a multiple of block size after orbit headers "
                      "(32B*nOrbitsPerPacket), orbit trailers (544B*nOrbitsPerPacket), "
                      "and packet trailer (32B) have been subtracted. \n Frame size = "
                   << inputSize << ", block size = " << bsize << ", packet will be skipped";
    }
    return false;
  } else {
    stats.n_consistent_sized_packets++;
    if (stats.n_consistent_sized_packets == 6000000) {  // every ~10 mins
      LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 "
                      "after 6000000M consistnent packets. Count was at "
                   << stats.packet_skipped_inconsistent_size;
      stats.packet_skipped_inconsistent_size = 0;
    }
  }
  return true;
}

// Looks for orbit trailer then counts the non-empty bunch crossings and fills a
// vector with their values The bool (.second) is used to determine valididy of
// the BX count
std::vector<unsigned int> StreamProcessor::CountBX(Slice &input, char *rd_ptr, bool &trailerError) {
  rd_ptr += 32;  // +32 to account for orbit header
  std::vector<unsigned int> bx_vect;
  trailerError = false;
  while (rd_ptr != input.end()) {
    blockMuon *bl = reinterpret_cast<blockMuon *>(rd_ptr);
    if (bl->orbit[0] == constants::beefdead) {  // found orbit trailer
      orbit_trailer *ot = (orbit_trailer *)(rd_ptr);
      for (unsigned int k = 0; k < (14 * 8); k++) {  // 14*8 = 14 frames, 8 links of orbit trailer
                                                     // containing BX hitmap
        bit_check(&bx_vect, ot->bx_map[k],
                  (k * 32 + 1));  // +1 added to account for BX counting starting at 1
      }
      return bx_vect;
    }
    rd_ptr += sizeof(blockMuon);
  }
  trailerError = true;
  return bx_vect;
}

inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char *rd_ptr) {
  // get orbit from orbit header
  blockMuon *bl_pre =
      reinterpret_cast<blockMuon *>(rd_ptr);  // blockMuon.orbit is identical to blockCalo.frame0
  uint32_t orbitN = uint32_t{bl_pre->orbit[0]};
  // save warning_test_enable bit
  bool warning_test_enable = bool{(orbitN & (1 << 31)) == (1 << 31)};
  // remove warning_test_enable bit from orbit header
  orbitN &= 0x7fffffff;
  std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{orbitN, warning_test_enable};
  return orbit_header;
}

// Goes through orbit worth of data and fills the output memory with the calo
// data corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo(
    std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) {
  std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{
      ProcessOrbitHeader(rd_ptr)};  //.second is the warning test enable bit
  rd_ptr += 32;                     // +32 to account for orbit header
  uint32_t relbx = uint32_t{0};
  uint32_t counts = uint32_t{0};
  uint32_t orbit = uint32_t{orbit_header.first};
  while (relbx < bx_vect.size()) {  // total number of non-empty BXs in orbit is
                                    // given by bx_vect.size()
    blockCalo *bl = reinterpret_cast<blockCalo *>(rd_ptr);
    if (bl->calo0[0] == constants::beefdead) {
      break;
    }  // orbit trailer has been reached, end of orbit data
    uint32_t bx = uint32_t{bx_vect[relbx]};
    uint32_t orbit_ = uint32_t{orbit_header.first};
    if (bx > 3554) {
      orbit_--;
    }  // fix for the fact that bx 3555 - 3564 are from the previous orbit
    uint32_t header = uint32_t{orbit_header.second};  // header can be added to later
    memcpy(wr_ptr, (char *)&header, 4);
    wr_ptr += 4;
    memcpy(wr_ptr, (char *)&bx, 4);
    wr_ptr += 4;
    memcpy(wr_ptr, (char *)&orbit_, 4);
    wr_ptr += 4;
    for (uint32_t i = 0; i < 8; i++) {
      memcpy(wr_ptr, (char *)&i, 4);
      wr_ptr += 4;  // gives link number, can later be used to find object type
      memcpy(wr_ptr, (char *)&bl->calo0[i], 4);
      wr_ptr += 4;
      memcpy(wr_ptr, (char *)&bl->calo1[i], 4);
      wr_ptr += 4;
      memcpy(wr_ptr, (char *)&bl->calo2[i], 4);
      wr_ptr += 4;
      memcpy(wr_ptr, (char *)&bl->calo3[i], 4);
      wr_ptr += 4;
      memcpy(wr_ptr, (char *)&bl->calo4[i], 4);
      wr_ptr += 4;
      memcpy(wr_ptr, (char *)&bl->calo5[i], 4);
      wr_ptr += 4;
    }
    counts += 1;
    rd_ptr += sizeof(blockCalo);
    relbx++;
  }

  StreamProcessor::fillOrbitMetadata meta = {
      counts,
      orbit,
  };
  return meta;
}

uint32_t StreamProcessor::FillBril(char *rd_ptr, char *wr_ptr, char *end_ptr) {
  static constexpr uint32_t NHistosPerPacket = 20;  // should not be changed, any more and SB852
                                                    // crashes, would need to re-upload bitfile
  uint32_t histo_i, histo_word_i = 0;
  std::array<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>,
             NHistosPerPacket>
      histo_arr;
  // uint32_t histo_arr[NHistosPerPacket][constants::NBXPerOrbit +
  // NFramesInHistoHeader];

  // BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit +
  // constants::NFramesInHistoHeader>> BrilQueue;

  while ((rd_ptr != end_ptr) && (histo_i < NHistosPerPacket)) {
    brilFrame *fr = reinterpret_cast<brilFrame *>(rd_ptr);
    if (fr->word == constants::bril_header) {
      rd_ptr += 32;
      histo_i++;
      histo_word_i = 0;
      continue;
    }

    if (histo_i == 0) {
      continue;
    }  // Currently appears to be a bug where first histogram of packet is
       // truncated, need to understand and fix, for now skip

    if (histo_word_i < constants::NFramesInHistoHeader) {
      histo_arr[histo_i][histo_word_i] = (fr->word >> 0) & 0xffffffff;
    } else {
      histo_arr[histo_i][(histo_word_i * 2) - constants::NFramesInHistoHeader] =
          (fr->word >> 0) & 0xffff;
      histo_arr[histo_i][(histo_word_i * 2) + 1 - constants::NFramesInHistoHeader] =
          (fr->word >> 16) & 0xffff;
    }
    rd_ptr += sizeof(brilFrame);
    histo_word_i++;
  }
  uint32_t packed_size = sizeof(uint32_t) * NHistosPerPacket *
                         (constants::NBXPerOrbit + constants::NFramesInHistoHeader);
  memcpy(wr_ptr, (char *)&histo_arr, packed_size);
  wr_ptr += packed_size;

  for (std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader> &hist :
       histo_arr) {
    BrilQueue.push(hist);
  }

  return packed_size;
}

// Goes through orbit worth of data and fills the output memory with the muons
// corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(
    std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) {
  std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{
      ProcessOrbitHeader(rd_ptr)};  //.second is the warning test enable bit
  rd_ptr += 32;                     // +32 to account for orbit header
  uint32_t orbit = uint32_t{orbit_header.first};
  uint32_t relbx = uint32_t{0};
  uint32_t counts = uint32_t{0};
  while (relbx < bx_vect.size()) {  // total number of non-empty BXs in orbit is
                                    // given by bx_vect.size()
    blockMuon *bl = reinterpret_cast<blockMuon *>(rd_ptr);
    if (bl->orbit[0] == constants::beefdead) {
      break;
    }  // orbit trailer has been reached, end of orbit data
    int mAcount = 0;
    int mBcount = 0;
    bool AblocksOn[8];
    bool BblocksOn[8];
    uint32_t bx = uint32_t{bx_vect[relbx]};
    if (bx > 3554) {
      orbit--;
    }  // fix for the fact that bx 3555 - 3564 are from the previous orbit
    for (unsigned int i = 0; i < 8; i++) {
      uint32_t bxA = (bl->bx[i] >> shifts::bx) & masks::bx;
      if ((bxA != bx) && (i == 0) && (bx < 3555) &&
          control.verbosity) {  // only prints warning when BX < 3555 i.e from
                                // the same orbit
        LOG(WARNING) << "BX mismatch, uGMT data word BX = " << std::hex << bxA
                     << ", BX extracted from trailer = " << bx << ", orbitN is " << std::dec
                     << orbit;
      }

      uint32_t pt = uint32_t{(bl->mu1f[i] >> shifts::pt) & masks::pt};

      AblocksOn[i] = ((pt > 0) || (doZS == 0));
      if ((pt > 0) || (doZS == 0)) {
        mAcount++;
      }
      pt = (bl->mu2f[i] >> shifts::pt) & masks::pt;
      BblocksOn[i] = ((pt > 0) || (doZS == 0));
      if ((pt > 0) || (doZS == 0)) {
        mBcount++;
      }
    }
    uint32_t bxcount = std::max(mAcount, mBcount);
    if (bxcount == 0) {
      rd_ptr += sizeof(blockMuon);
      LOG(WARNING) << '#' << nbPackets
                   << ": Detected a bx with zero muons, this should not "
                      "happen. Packet is skipped.";
      continue;
    }

    // header word of packed muon data contains mAcount (number of muons in
    // words 3,4) and mBcount (number of muons in words 5,5), as well as the
    // warning test enaable flag.
    uint32_t header =
        uint32_t{(mAcount << 16) + ((static_cast<uint32_t>(orbit_header.second)) << 8) + mBcount};

    counts += mAcount;
    counts += mBcount;
    memcpy(wr_ptr, (char *)&header, 4);
    wr_ptr += 4;
    memcpy(wr_ptr, (char *)&bx, 4);
    wr_ptr += 4;
    memcpy(wr_ptr, (char *)&orbit, 4);
    wr_ptr += 4;
    for (unsigned int i = 0; i < 8; i++) {
      if (AblocksOn[i]) {
        memcpy(wr_ptr, (char *)&bl->mu1f[i], 4);
        wr_ptr += 4;
        memcpy(wr_ptr, (char *)&bl->mu1s[i], 4);
        wr_ptr += 4;
        // next creating mu.extra which is a copy of bl->bx with a change to the
        // first bit
        memcpy(wr_ptr, (char *)&(bl->bx[i] &= ~0x1), 4);
        wr_ptr += 4;  // set bit 0 to 0 for first muon
      }
    }

    for (unsigned int i = 0; i < 8; i++) {
      if (BblocksOn[i]) {
        memcpy(wr_ptr, (char *)&bl->mu2f[i], 4);
        wr_ptr += 4;
        memcpy(wr_ptr, (char *)&bl->mu2s[i], 4);
        wr_ptr += 4;
        // next creating mu.extra which is a copy of bl->bx with a change to the
        // first bit
        memcpy(wr_ptr, (char *)&(bl->bx[i] |= 0x1), 4);
        wr_ptr += 4;  // set bit 0 to 1 for second muon
      }
    }

    rd_ptr += sizeof(blockMuon);

    relbx++;
  }
  StreamProcessor::fillOrbitMetadata meta = {
      counts,
      orbit,
  };
  return meta;
}

void StreamProcessor::process(Slice &input, Slice &out) {
  nbPackets++;
  stats.packet_count++;

  // Implement prescale - only for CALO
  if ((processorType == ProcessorType::CALO) && (prescaleFactor != 0)) {
    if (stats.packet_count % prescaleFactor != 0) {
      return;
    }
  }
  char *rd_ptr = input.begin();
  char *wr_ptr = out.begin();
  char *end_ptr = input.end();
  uint32_t counts = 0;
  bool endofpacket = false;
  uint32_t orbit_per_packet_count = 0;
  bool firstOrbit = true;
  StreamProcessor::fillOrbitMetadata meta{
      0,
      0,
  };
  if (processorType == ProcessorType::PASS_THROUGH) {
    memcpy(wr_ptr, rd_ptr, input.size());
    out.set_end(out.begin() + input.size());
    out.set_counts(1);
    return;
  }
  if (processorType == ProcessorType::BRIL) {
    counts = FillBril(rd_ptr, wr_ptr, end_ptr);
    out.set_end(out.begin() + counts);
    out.set_counts(counts);
    return;
  }

  if (!CheckFrameMultBlock(input.size())) {
    return;
  }
  while (endofpacket == false) {
    std::vector<unsigned int> bx_vect;
    uint32_t orbitCount = 0;
    bool trailerError = false;
    bx_vect = CountBX(input, rd_ptr, trailerError);
    if (trailerError == true) {
      stats.orbit_trailer_error_count++;
      LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data "
                      "packet. Packet will be skipped. Orbit trailer error count = "
                   << stats.orbit_trailer_error_count;
      return;
    }
    std::sort(bx_vect.begin(), bx_vect.end());
    if (processorType == ProcessorType::GMT) {
      meta = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr);
      orbitCount = meta.counts;
      ++orbit_per_packet_count;
      wr_ptr += orbitCount * 12 + 12 * bx_vect.size();  // 12 bytes for each muon/count then 12
                                                        // bytes for each bx header
    } else if (processorType == ProcessorType::CALO) {
      meta = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr);
      orbitCount = meta.counts;
      ++orbit_per_packet_count;
      // size of calo packet is 4bytes*(8links*7dataWords + 3headerWords)=236
      // bytes Note 7 data words per link because we have the "link number" word
      // + 6 words from calo L2
      wr_ptr += 4 * ((8 * 7) + 3) * bx_vect.size();

    } else {
      LOG(ERROR) << "UNKNOWN PROCESSOR_TYPE, EXITING";
      throw std::invalid_argument("ERROR: PROCESSOR_TYPE NOT RECOGNISED");
    }
    rd_ptr += 32 + bx_vect.size() * sizeof(blockMuon) +
              constants::orbit_trailer_size;  // 32 for orbit header, + nBXs +
                                              // orbit trailer
    counts += orbitCount;
    if (firstOrbit) {
      out.set_firstOrbitN(meta.orbit);
      firstOrbit = false;
    };
    bx_vect.clear();

    if (rd_ptr < input.end()) {
      uint32_t *dma_trailer_word = (uint32_t *)(rd_ptr);
      if (*dma_trailer_word == constants::deadbeef) {
        endofpacket = true;
        out.set_end(wr_ptr);
        out.set_counts(counts);
        return;
      }

      if (orbit_per_packet_count > nOrbitsPerDMAPacket) {
        if (control.verbosity) {
          LOG(WARNING) << "expected DMA trailer word deadbeef, found " << std::hex
                       << *dma_trailer_word << ". Orbits per packet count "
                       << orbit_per_packet_count << ", > expected, (" << nOrbitsPerDMAPacket
                       << ") skipping packet.";
        }
        stats.excess_orbits_per_packet_count++;
        if (stats.excess_orbits_per_packet_count % 10000 == 0) {
          LOG(WARNING) << "count of packets with excess # orbits "
                       << stats.excess_orbits_per_packet_count;
        }
        return;
      }
    }
  }
}
void *StreamProcessor::operator()(void *item) {
  Slice &input = *static_cast<Slice *>(item);
  Slice &out = *Slice::allocate(2 * max_size);
  process(input, out);
  Slice::giveAllocated(&input);
  return &out;
}