-
Thomas Owen James authoredThomas Owen James authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
processor.cc 16.47 KiB
#include "processor.h"
#include <algorithm>
#include <cmath>
#include <iomanip>
#include <vector>
#include "format.h"
#include "log.h"
#include "slice.h"
// Definition of the static member stats
StreamProcessor::Statistics StreamProcessor::stats;
StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_,
uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_,
ctrl &control_)
: tbb::filter(parallel),
max_size(max_size_),
nbPackets(0),
doZS(doZS_),
processorType(processorType_),
nOrbitsPerDMAPacket(nOrbitsPerDMAPacket_),
prescaleFactor(prescaleFactor_),
control(control_) {
LOG(TRACE) << "Created transform filter at " << static_cast<void *>(this);
myfile.open("example.txt");
}
BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>>
StreamProcessor::BrilQueue;
// Loops over each word in the orbit trailer BX map and fills a vector with the
// non-empty BX values
void bit_check(std::vector<unsigned int> *bx_vect, uint32_t word, uint32_t offset) {
for (uint32_t i = 0; i < 32; i++) {
if (word & 1) {
bx_vect->push_back(i + offset);
}
word >>= 1;
}
return;
}
StreamProcessor::~StreamProcessor() { myfile.close(); }
// checks that the packet size is an integer multiple of the BX block size,
// minus the header/trailers
bool StreamProcessor::CheckFrameMultBlock(size_t inputSize) {
int bsize = sizeof(blockMuon);
if ((inputSize - nOrbitsPerDMAPacket * constants::orbit_trailer_size - 32 * nOrbitsPerDMAPacket -
32) %
bsize !=
0) {
stats.n_consistent_sized_packets = 0;
stats.packet_skipped_inconsistent_size++;
if ((stats.packet_skipped_inconsistent_size++ == 1) ||
((stats.packet_skipped_inconsistent_size < 100) &&
(stats.packet_skipped_inconsistent_size % 10 == 0)) ||
((stats.packet_skipped_inconsistent_size < 1000) &&
(stats.packet_skipped_inconsistent_size % 100 == 0)) ||
((stats.packet_skipped_inconsistent_size < 10000) &&
(stats.packet_skipped_inconsistent_size % 1000 == 0)) ||
(stats.packet_skipped_inconsistent_size % 10000 == 0)) {
LOG(WARNING) << "Frame size not a multiple of block size after headers "
"and trailers have been subtracted. Counted "
<< stats.packet_skipped_inconsistent_size << " packets skipped.";
}
if (control.verbosity != 0) {
LOG(WARNING) << "Frame size not a multiple of block size after orbit headers "
"(32B*nOrbitsPerPacket), orbit trailers (544B*nOrbitsPerPacket), "
"and packet trailer (32B) have been subtracted. \n Frame size = "
<< inputSize << ", block size = " << bsize << ", packet will be skipped";
}
return false;
} else {
stats.n_consistent_sized_packets++;
if (stats.n_consistent_sized_packets == 6000000) { // every ~10 mins
LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 "
"after 6000000M consistnent packets. Count was at "
<< stats.packet_skipped_inconsistent_size;
stats.packet_skipped_inconsistent_size = 0;
}
}
return true;
}
// Looks for orbit trailer then counts the non-empty bunch crossings and fills a
// vector with their values The bool (.second) is used to determine valididy of
// the BX count
std::vector<unsigned int> StreamProcessor::CountBX(Slice &input, char *rd_ptr, bool &trailerError) {
rd_ptr += 32; // +32 to account for orbit header
std::vector<unsigned int> bx_vect;
trailerError = false;
while (rd_ptr != input.end()) {
blockMuon *bl = reinterpret_cast<blockMuon *>(rd_ptr);
if (bl->orbit[0] == constants::beefdead) { // found orbit trailer
orbit_trailer *ot = (orbit_trailer *)(rd_ptr);
for (unsigned int k = 0; k < (14 * 8); k++) { // 14*8 = 14 frames, 8 links of orbit trailer
// containing BX hitmap
bit_check(&bx_vect, ot->bx_map[k],
(k * 32 + 1)); // +1 added to account for BX counting starting at 1
}
return bx_vect;
}
rd_ptr += sizeof(blockMuon);
}
trailerError = true;
return bx_vect;
}
inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char *rd_ptr) {
// get orbit from orbit header
blockMuon *bl_pre =
reinterpret_cast<blockMuon *>(rd_ptr); // blockMuon.orbit is identical to blockCalo.frame0
uint32_t orbitN = uint32_t{bl_pre->orbit[0]};
// save warning_test_enable bit
bool warning_test_enable = bool{(orbitN & (1 << 31)) == (1 << 31)};
// remove warning_test_enable bit from orbit header
orbitN &= 0x7fffffff;
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{orbitN, warning_test_enable};
return orbit_header;
}
// Goes through orbit worth of data and fills the output memory with the calo
// data corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo(
std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) {
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{
ProcessOrbitHeader(rd_ptr)}; //.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t relbx = uint32_t{0};
uint32_t counts = uint32_t{0};
uint32_t orbit = uint32_t{orbit_header.first};
while (relbx < bx_vect.size()) { // total number of non-empty BXs in orbit is
// given by bx_vect.size()
blockCalo *bl = reinterpret_cast<blockCalo *>(rd_ptr);
if (bl->calo0[0] == constants::beefdead) {
break;
} // orbit trailer has been reached, end of orbit data
uint32_t bx = uint32_t{bx_vect[relbx]};
uint32_t orbit_ = uint32_t{orbit_header.first};
if (bx > 3554) {
orbit_--;
} // fix for the fact that bx 3555 - 3564 are from the previous orbit
uint32_t header = uint32_t{orbit_header.second}; // header can be added to later
memcpy(wr_ptr, (char *)&header, 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bx, 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&orbit_, 4);
wr_ptr += 4;
for (uint32_t i = 0; i < 8; i++) {
memcpy(wr_ptr, (char *)&i, 4);
wr_ptr += 4; // gives link number, can later be used to find object type
memcpy(wr_ptr, (char *)&bl->calo0[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->calo1[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->calo2[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->calo3[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->calo4[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->calo5[i], 4);
wr_ptr += 4;
}
counts += 1;
rd_ptr += sizeof(blockCalo);
relbx++;
}
StreamProcessor::fillOrbitMetadata meta = {
counts,
orbit,
};
return meta;
}
uint32_t StreamProcessor::FillBril(char *rd_ptr, char *wr_ptr, char *end_ptr) {
static constexpr uint32_t NHistosPerPacket = 20; // should not be changed, any more and SB852
// crashes, would need to re-upload bitfile
uint32_t histo_i, histo_word_i = 0;
std::array<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>,
NHistosPerPacket>
histo_arr;
// uint32_t histo_arr[NHistosPerPacket][constants::NBXPerOrbit +
// NFramesInHistoHeader];
// BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit +
// constants::NFramesInHistoHeader>> BrilQueue;
while ((rd_ptr != end_ptr) && (histo_i < NHistosPerPacket)) {
brilFrame *fr = reinterpret_cast<brilFrame *>(rd_ptr);
if (fr->word == constants::bril_header) {
rd_ptr += 32;
histo_i++;
histo_word_i = 0;
continue;
}
if (histo_i == 0) {
continue;
} // Currently appears to be a bug where first histogram of packet is
// truncated, need to understand and fix, for now skip
if (histo_word_i < constants::NFramesInHistoHeader) {
histo_arr[histo_i][histo_word_i] = (fr->word >> 0) & 0xffffffff;
} else {
histo_arr[histo_i][(histo_word_i * 2) - constants::NFramesInHistoHeader] =
(fr->word >> 0) & 0xffff;
histo_arr[histo_i][(histo_word_i * 2) + 1 - constants::NFramesInHistoHeader] =
(fr->word >> 16) & 0xffff;
}
rd_ptr += sizeof(brilFrame);
histo_word_i++;
}
uint32_t packed_size = sizeof(uint32_t) * NHistosPerPacket *
(constants::NBXPerOrbit + constants::NFramesInHistoHeader);
memcpy(wr_ptr, (char *)&histo_arr, packed_size);
wr_ptr += packed_size;
for (std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader> &hist :
histo_arr) {
BrilQueue.push(hist);
}
return packed_size;
}
// Goes through orbit worth of data and fills the output memory with the muons
// corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(
std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) {
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{
ProcessOrbitHeader(rd_ptr)}; //.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t orbit = uint32_t{orbit_header.first};
uint32_t relbx = uint32_t{0};
uint32_t counts = uint32_t{0};
while (relbx < bx_vect.size()) { // total number of non-empty BXs in orbit is
// given by bx_vect.size()
blockMuon *bl = reinterpret_cast<blockMuon *>(rd_ptr);
if (bl->orbit[0] == constants::beefdead) {
break;
} // orbit trailer has been reached, end of orbit data
int mAcount = 0;
int mBcount = 0;
bool AblocksOn[8];
bool BblocksOn[8];
uint32_t bx = uint32_t{bx_vect[relbx]};
if (bx > 3554) {
orbit--;
} // fix for the fact that bx 3555 - 3564 are from the previous orbit
for (unsigned int i = 0; i < 8; i++) {
uint32_t bxA = (bl->bx[i] >> shifts::bx) & masks::bx;
if ((bxA != bx) && (i == 0) && (bx < 3555) &&
control.verbosity) { // only prints warning when BX < 3555 i.e from
// the same orbit
LOG(WARNING) << "BX mismatch, uGMT data word BX = " << std::hex << bxA
<< ", BX extracted from trailer = " << bx << ", orbitN is " << std::dec
<< orbit;
}
uint32_t pt = uint32_t{(bl->mu1f[i] >> shifts::pt) & masks::pt};
AblocksOn[i] = ((pt > 0) || (doZS == 0));
if ((pt > 0) || (doZS == 0)) {
mAcount++;
}
pt = (bl->mu2f[i] >> shifts::pt) & masks::pt;
BblocksOn[i] = ((pt > 0) || (doZS == 0));
if ((pt > 0) || (doZS == 0)) {
mBcount++;
}
}
uint32_t bxcount = std::max(mAcount, mBcount);
if (bxcount == 0) {
rd_ptr += sizeof(blockMuon);
LOG(WARNING) << '#' << nbPackets
<< ": Detected a bx with zero muons, this should not "
"happen. Packet is skipped.";
continue;
}
// header word of packed muon data contains mAcount (number of muons in
// words 3,4) and mBcount (number of muons in words 5,5), as well as the
// warning test enaable flag.
uint32_t header =
uint32_t{(mAcount << 16) + ((static_cast<uint32_t>(orbit_header.second)) << 8) + mBcount};
counts += mAcount;
counts += mBcount;
memcpy(wr_ptr, (char *)&header, 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bx, 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&orbit, 4);
wr_ptr += 4;
for (unsigned int i = 0; i < 8; i++) {
if (AblocksOn[i]) {
memcpy(wr_ptr, (char *)&bl->mu1f[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->mu1s[i], 4);
wr_ptr += 4;
// next creating mu.extra which is a copy of bl->bx with a change to the
// first bit
memcpy(wr_ptr, (char *)&(bl->bx[i] &= ~0x1), 4);
wr_ptr += 4; // set bit 0 to 0 for first muon
}
}
for (unsigned int i = 0; i < 8; i++) {
if (BblocksOn[i]) {
memcpy(wr_ptr, (char *)&bl->mu2f[i], 4);
wr_ptr += 4;
memcpy(wr_ptr, (char *)&bl->mu2s[i], 4);
wr_ptr += 4;
// next creating mu.extra which is a copy of bl->bx with a change to the
// first bit
memcpy(wr_ptr, (char *)&(bl->bx[i] |= 0x1), 4);
wr_ptr += 4; // set bit 0 to 1 for second muon
}
}
rd_ptr += sizeof(blockMuon);
relbx++;
}
StreamProcessor::fillOrbitMetadata meta = {
counts,
orbit,
};
return meta;
}
void StreamProcessor::process(Slice &input, Slice &out) {
nbPackets++;
stats.packet_count++;
// Implement prescale - only for CALO
if ((processorType == ProcessorType::CALO) && (prescaleFactor != 0)) {
if (stats.packet_count % prescaleFactor != 0) {
return;
}
}
char *rd_ptr = input.begin();
char *wr_ptr = out.begin();
char *end_ptr = input.end();
uint32_t counts = 0;
bool endofpacket = false;
uint32_t orbit_per_packet_count = 0;
bool firstOrbit = true;
StreamProcessor::fillOrbitMetadata meta{
0,
0,
};
if (processorType == ProcessorType::PASS_THROUGH) {
memcpy(wr_ptr, rd_ptr, input.size());
out.set_end(out.begin() + input.size());
out.set_counts(1);
return;
}
if (processorType == ProcessorType::BRIL) {
counts = FillBril(rd_ptr, wr_ptr, end_ptr);
out.set_end(out.begin() + counts);
out.set_counts(counts);
return;
}
if (!CheckFrameMultBlock(input.size())) {
return;
}
while (endofpacket == false) {
std::vector<unsigned int> bx_vect;
uint32_t orbitCount = 0;
bool trailerError = false;
bx_vect = CountBX(input, rd_ptr, trailerError);
if (trailerError == true) {
stats.orbit_trailer_error_count++;
LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data "
"packet. Packet will be skipped. Orbit trailer error count = "
<< stats.orbit_trailer_error_count;
return;
}
std::sort(bx_vect.begin(), bx_vect.end());
if (processorType == ProcessorType::GMT) {
meta = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr);
orbitCount = meta.counts;
++orbit_per_packet_count;
wr_ptr += orbitCount * 12 + 12 * bx_vect.size(); // 12 bytes for each muon/count then 12
// bytes for each bx header
} else if (processorType == ProcessorType::CALO) {
meta = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr);
orbitCount = meta.counts;
++orbit_per_packet_count;
// size of calo packet is 4bytes*(8links*7dataWords + 3headerWords)=236
// bytes Note 7 data words per link because we have the "link number" word
// + 6 words from calo L2
wr_ptr += 4 * ((8 * 7) + 3) * bx_vect.size();
} else {
LOG(ERROR) << "UNKNOWN PROCESSOR_TYPE, EXITING";
throw std::invalid_argument("ERROR: PROCESSOR_TYPE NOT RECOGNISED");
}
rd_ptr += 32 + bx_vect.size() * sizeof(blockMuon) +
constants::orbit_trailer_size; // 32 for orbit header, + nBXs +
// orbit trailer
counts += orbitCount;
if (firstOrbit) {
out.set_firstOrbitN(meta.orbit);
firstOrbit = false;
};
bx_vect.clear();
if (rd_ptr < input.end()) {
uint32_t *dma_trailer_word = (uint32_t *)(rd_ptr);
if (*dma_trailer_word == constants::deadbeef) {
endofpacket = true;
out.set_end(wr_ptr);
out.set_counts(counts);
return;
}
if (orbit_per_packet_count > nOrbitsPerDMAPacket) {
if (control.verbosity) {
LOG(WARNING) << "expected DMA trailer word deadbeef, found " << std::hex
<< *dma_trailer_word << ". Orbits per packet count "
<< orbit_per_packet_count << ", > expected, (" << nOrbitsPerDMAPacket
<< ") skipping packet.";
}
stats.excess_orbits_per_packet_count++;
if (stats.excess_orbits_per_packet_count % 10000 == 0) {
LOG(WARNING) << "count of packets with excess # orbits "
<< stats.excess_orbits_per_packet_count;
}
return;
}
}
}
}
void *StreamProcessor::operator()(void *item) {
Slice &input = *static_cast<Slice *>(item);
Slice &out = *Slice::allocate(2 * max_size);
process(input, out);
Slice::giveAllocated(&input);
return &out;
}