Skip to content
Snippets Groups Projects
Commit fdf6b083 authored by Dinyar Rabady's avatar Dinyar Rabady Committed by Dinyar Rabady
Browse files

Perform BX map decoding on the fly

Belongs to #35.
parent 9b29772d
No related branches found
No related tags found
No related merge requests found
......@@ -26,19 +26,6 @@ StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType pro
BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>> StreamProcessor::BrilQueue;
// Loops over each word in the orbit trailer BX map and fills a vector with the non-empty BX values
void bit_check(std::vector<unsigned int>* bx_vect, uint32_t word, uint32_t offset)
{
for (uint32_t i = 0; i<32; i++)
{
if (word & 1){
bx_vect->push_back(i + offset);
}
word >>= 1;
}
return;
}
StreamProcessor::~StreamProcessor(){
myfile.close();
}
......@@ -76,26 +63,16 @@ bool StreamProcessor::CheckFrameMultBlock(size_t inputSize){
return true;
}
// Looks for orbit trailer then counts the non-empty bunch crossings and fills a vector with their values
// The bool (.second) is used to determine valididy of the BX count
std::vector<unsigned int> StreamProcessor::CountBX(Slice& input, char* rd_ptr, bool& trailerError){
bool StreamProcessor::GetTrailer(Slice& input, char*& rd_ptr) {
rd_ptr += 32; // +32 to account for orbit header
std::vector<unsigned int> bx_vect;
trailerError = false;
while ( rd_ptr != input.end()){
blockMuon *bl = reinterpret_cast<blockMuon*>(rd_ptr);
if (bl->orbit[0]==constants::beefdead){ // found orbit trailer
orbit_trailer *ot = (orbit_trailer*)(rd_ptr);
for (unsigned int k = 0; k < (14*8); k++){ // 14*8 = 14 frames, 8 links of orbit trailer containing BX hitmap
bit_check(&bx_vect, ot->bx_map[k], (k*32 + 1));// +1 added to account for BX counting starting at 1
}
return bx_vect;
orbit_trailer* ot = reinterpret_cast<orbit_trailer*>(rd_ptr);
if (ot->beefdead[0]==constants::beefdead){ // found orbit trailer
return true;
}
rd_ptr+=sizeof(blockMuon);
}
trailerError = true;
return bx_vect;
return false;
}
inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char* rd_ptr){
......@@ -111,38 +88,58 @@ inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char* rd_pt
}
// Goes through orbit worth of data and fills the output memory with the calo data corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo(const std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool> {ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t relbx = uint32_t{0};
uint32_t counts = uint32_t{0};
uint32_t orbit = uint32_t{orbit_header.first};
while(relbx < bx_vect.size()){ //total number of non-empty BXs in orbit is given by bx_vect.size()
blockCalo *bl = reinterpret_cast<blockCalo*>(rd_ptr);
if (bl->calo0[0]==constants::beefdead){break;} // orbit trailer has been reached, end of orbit data
uint32_t bx = uint32_t{bx_vect[relbx]};
uint32_t orbit_ = uint32_t{orbit_header.first};
if (bx > 3554){--orbit_;} //fix for the fact that bx 3555 - 3564 are from the previous orbit
uint32_t header = uint32_t{orbit_header.second}; //header can be added to later
memcpy(wr_ptr,(char*)&header,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bx,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&orbit_,4); wr_ptr+=4;
for(uint32_t i = 0; i < 8; i++){
memcpy(wr_ptr,(char*)&i,4); wr_ptr+=4; //gives link number, can later be used to find object type
memcpy(wr_ptr,(char*)&bl->calo0[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo1[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo2[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo3[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo4[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo5[i],4); wr_ptr+=4;
}
counts += 1;
rd_ptr+=sizeof(blockCalo);
relbx++;
}
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo(orbit_trailer* trailer, char* rd_ptr, char* wr_ptr){
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool> {ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t orbit = uint32_t{orbit_header.first} - 1; // Starting with orbit number one lower than what is in the header because the "link orbit" contains a few BXs of the previous orbit
uint32_t counts = uint32_t{0};
uint32_t filled_bxs = 0;
// We loop over the BX map from the orbit trailer and then match the filled BXs to the data we got.
// The logic below is annoyingly convoluted:
// The first BX we get in the data stream is from BX 3555, however the BX map starts at BX 1,
// we therefore need to start reading the BX map from the 3555th bit.
// 3555//32 = 111, so we start at the 111th word; 3555 mod 32 = 3, however we start counting at BX1, so the start bit is 2.
uint32_t bx = 3554; // We start at 3554 here, because we increment by 1 immediately after starting the loops.
size_t word = 111;
size_t bit = 1; // Will be incremented immediately after starting the loop
for (size_t pseudo_bx = 0; pseudo_bx < 3564; ++pseudo_bx) { // Looping over at most an entire orbit here.
if (word == 14*8-1 && bit == 11) { // Bit 11 in word 111 is the last valid BX (3564), so we roll over afterwards. (== 11 because that's the one we worked on in the previous loop iteration)
word = 0;
bit = 0;
bx = 0; // Will be immediately incremented to 1.
++orbit;
} else if(bit < 31) {
++bit;
} else {
bit = 0;
++word;
}
++bx;
if((trailer->bx_map[word] & (1 << bit)) == 0) {
continue; // If the bit is zero that BX was empty and we continue.
}
++filled_bxs;
blockCalo *bl = reinterpret_cast<blockCalo*>(rd_ptr);
if (bl->calo0[0]==constants::beefdead){break;} // orbit trailer has been reached, end of orbit data
uint32_t header = uint32_t{orbit_header.second}; //header can be added to later
memcpy(wr_ptr,(char*)&header,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bx,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&orbit,4); wr_ptr+=4;
for(uint32_t i = 0; i < 8; i++){
memcpy(wr_ptr,(char*)&i,4); wr_ptr+=4; //gives link number, can later be used to find object type
memcpy(wr_ptr,(char*)&bl->calo0[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo1[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo2[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo3[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo4[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->calo5[i],4); wr_ptr+=4;
}
counts += 1;
rd_ptr+=sizeof(blockCalo);
}
StreamProcessor::fillOrbitMetadata meta = {counts, orbit,};
return meta;
StreamProcessor::fillOrbitMetadata meta = {counts, orbit, filled_bxs};
return meta;
}
......@@ -187,81 +184,101 @@ uint32_t StreamProcessor::FillBril(char* rd_ptr, char* wr_ptr, char* end_ptr){
return packed_size;
}
// Goes through orbit worth of data and fills the output memory with the muons corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(const std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t orbit = uint32_t{orbit_header.first};
uint32_t relbx = uint32_t{0};
uint32_t counts = uint32_t{0};
while (relbx < bx_vect.size()){ //total number of non-empty BXs in orbit is given by bx_vect.size()
blockMuon *bl = reinterpret_cast<blockMuon*>(rd_ptr);
if (bl->orbit[0]==constants::beefdead){ break; } // orbit trailer has been reached, end of orbit data
int mAcount = 0;
int mBcount = 0;
bool AblocksOn[8];
bool BblocksOn[8];
uint32_t bx = uint32_t {bx_vect[relbx]};
if (bx > 3554){--orbit;} //fix for the fact that bx 3555 - 3564 are from the previous orbit
for (unsigned int i = 0; i < 8; i++){
uint32_t bxA = (bl->bx[i] >> shifts::bx) & masks::bx;
if ((bxA != bx) && (i == 0) && (bx<3555) && control.verbosity){ //only prints warning when BX < 3555 i.e from the same orbit
LOG(WARNING) << "BX mismatch, uGMT data word BX = " << std::hex << bxA <<
", BX extracted from trailer = "<< bx << ", orbitN is " << std::dec << orbit;
}
uint32_t pt = uint32_t{(bl->mu1f[i] >> shifts::pt) & masks::pt};
AblocksOn[i]=((pt>0) || (doZS==0));
if ((pt>0) || (doZS==0)){
mAcount++;
}
pt = (bl->mu2f[i] >> shifts::pt) & masks::pt;
BblocksOn[i]=((pt>0) || (doZS==0));
if ((pt>0) || (doZS==0)){
mBcount++;
}
}
uint32_t bxcount = std::max(mAcount,mBcount);
if (bxcount == 0) {
rd_ptr+=sizeof(blockMuon);
LOG(WARNING) << '#' << nbPackets << ": Detected a bx with zero muons, this should not happen. Packet is skipped.";
continue;
}
//header word of packed muon data contains mAcount (number of muons in words 3,4) and mBcount (number of muons in words 5,5), as well as the warning test enaable flag.
uint32_t header = uint32_t{(mAcount << 16) + ((static_cast<uint32_t>(orbit_header.second))<<8) + mBcount};
// Goes through orbit worth of data and fills the output memory with the muons corresponding to the non-empty bunchcrossings, as marked in bx_vect
StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(orbit_trailer* trailer, char* rd_ptr, char* wr_ptr){
std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit
rd_ptr += 32; // +32 to account for orbit header
uint32_t orbit = uint32_t{orbit_header.first} - 1; // Starting with orbit number one lower than what is in the header because the "link orbit" contains a few BXs of the previous orbit
uint32_t counts = uint32_t{0};
uint32_t filled_bxs = 0;
// We loop over the BX map from the orbit trailer and then match the filled BXs to the data we got.
// The logic below is annoyingly convoluted:
// The first BX we get in the data stream is from BX 3555, however the BX map starts at BX 1,
// we therefore need to start reading the BX map from the 3555th bit.
// 3555//32 = 111, so we start at the 111th word; 3555 mod 32 = 3, however we start counting at BX1, so the start bit is 2.
uint32_t bx = 3554; // We start at 3554 here, because we increment by 1 immediately after starting the loops.
size_t word = 111;
size_t bit = 1; // Will be incremented immediately after starting the loop
for (size_t pseudo_bx = 0; pseudo_bx < 3564; ++pseudo_bx) { // Looping over at most an entire orbit here.
if (word == 14*8-1 && bit == 11) { // Bit 11 in word 111 is the last valid BX (3564), so we roll over afterwards. (== 11 because that's the one we worked on in the previous loop iteration)
word = 0;
bit = 0;
bx = 0; // Will be immediately incremented to 1.
++orbit;
} else if(bit < 31) {
++bit;
} else {
bit = 0;
++word;
}
++bx;
if((trailer->bx_map[word] & (1 << bit)) == 0) {
continue; // If the bit is zero that BX was empty and we continue.
}
++filled_bxs;
blockMuon *bl = reinterpret_cast<blockMuon*>(rd_ptr);
if (bl->orbit[0]==constants::beefdead){ break; } // orbit trailer has been reached, end of orbit data
int mAcount = 0;
int mBcount = 0;
bool AblocksOn[8];
bool BblocksOn[8];
for (unsigned int i = 0; i < 8; i++) {
uint32_t bxA = (bl->bx[i] >> shifts::bx) & masks::bx;
if ((bxA != bx) && (i == 0) && control.verbosity) {
LOG(WARNING) << "BX mismatch, uGMT data word BX = " << std::hex << bxA <<
", BX extracted from trailer = "<< bx << ", orbitN is " << std::dec << orbit;
}
uint32_t orbitA = bl->orbit[i];
counts += mAcount;
counts += mBcount;
memcpy(wr_ptr,(char*)&header,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bx,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&orbit,4); wr_ptr+=4;
for (unsigned int i = 0; i < 8; i++){
if (AblocksOn[i]){
memcpy(wr_ptr,(char*)&bl->mu1f[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->mu1s[i],4); wr_ptr+=4;
// next creating mu.extra which is a copy of bl->bx with a change to the first bit
memcpy(wr_ptr,(char*)&(bl->bx[i] &= ~0x1),4); wr_ptr+=4; //set bit 0 to 0 for first muon
}
}
uint32_t pt = uint32_t{(bl->mu1f[i] >> shifts::pt) & masks::pt};
AblocksOn[i]=((pt>0) || (doZS==0));
if ((pt>0) || (doZS==0)) {
mAcount++;
}
pt = (bl->mu2f[i] >> shifts::pt) & masks::pt;
BblocksOn[i]=((pt>0) || (doZS==0));
if ((pt>0) || (doZS==0)) {
mBcount++;
}
}
uint32_t bxcount = std::max(mAcount,mBcount);
if (bxcount == 0) {
rd_ptr+=sizeof(blockMuon);
LOG(WARNING) << '#' << nbPackets << ": Detected a bx with zero muons, this should not happen. Packet is skipped.";
continue;
}
//header word of packed muon data contains mAcount (number of muons in words 3,4) and mBcount (number of muons in words 5,5), as well as the warning test enaable flag.
uint32_t header = uint32_t{(mAcount << 16) + ((static_cast<uint32_t>(orbit_header.second))<<8) + mBcount};
for (unsigned int i = 0; i < 8; i++){
if (BblocksOn[i]){
memcpy(wr_ptr,(char*)&bl->mu2f[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->mu2s[i],4); wr_ptr+=4;
// next creating mu.extra which is a copy of bl->bx with a change to the first bit
memcpy(wr_ptr,(char*)&(bl->bx[i] |= 0x1),4); wr_ptr+=4; //set bit 0 to 1 for second muon
}
}
rd_ptr+=sizeof(blockMuon);
counts += mAcount;
counts += mBcount;
memcpy(wr_ptr,(char*)&header,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bx,4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&orbit,4); wr_ptr+=4;
for (unsigned int i = 0; i < 8; i++){
if (AblocksOn[i]){
memcpy(wr_ptr,(char*)&bl->mu1f[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->mu1s[i],4); wr_ptr+=4;
// next creating mu.extra which is a copy of bl->bx with a change to the first bit
memcpy(wr_ptr,(char*)&(bl->bx[i] &= ~0x1),4); wr_ptr+=4; //set bit 0 to 0 for first muon
}
}
relbx++;
}
StreamProcessor::fillOrbitMetadata meta = {counts, orbit,};
return meta;
for (unsigned int i = 0; i < 8; i++){
if (BblocksOn[i]){
memcpy(wr_ptr,(char*)&bl->mu2f[i],4); wr_ptr+=4;
memcpy(wr_ptr,(char*)&bl->mu2s[i],4); wr_ptr+=4;
// next creating mu.extra which is a copy of bl->bx with a change to the first bit
memcpy(wr_ptr,(char*)&(bl->bx[i] |= 0x1),4); wr_ptr+=4; //set bit 0 to 1 for second muon
}
}
rd_ptr+=sizeof(blockMuon);
}
StreamProcessor::fillOrbitMetadata meta = {counts, orbit, filled_bxs};
return meta;
}
void StreamProcessor::process(Slice& input, Slice& out)
......@@ -273,6 +290,7 @@ void StreamProcessor::process(Slice& input, Slice& out)
if ((processorType == ProcessorType::CALO) && (prescaleFactor!=0)) {
if (stats.packet_count%prescaleFactor != 0){return;}
}
char* rd_ptr = input.begin();
char* wr_ptr = out.begin();
......@@ -281,7 +299,7 @@ void StreamProcessor::process(Slice& input, Slice& out)
bool endofpacket = false;
uint32_t orbit_per_packet_count = 0;
bool firstOrbit = true;
StreamProcessor::fillOrbitMetadata meta{0,0,};
StreamProcessor::fillOrbitMetadata meta{0,0,0};
if (processorType == ProcessorType::PASS_THROUGH) {
memcpy(wr_ptr,rd_ptr,input.size());
out.set_end(out.begin() + input.size());
......@@ -298,38 +316,36 @@ void StreamProcessor::process(Slice& input, Slice& out)
if (!CheckFrameMultBlock(input.size())){ return; }
while (endofpacket == false){
uint32_t orbitCount = 0;
bool trailerError = false;
std::vector<unsigned int> bx_vect = CountBX(input, rd_ptr, trailerError);
if(trailerError == true){
char* trailer_ptr = rd_ptr;
bool trailerFound = GetTrailer(input, trailer_ptr);
if(!trailerFound){
stats.orbit_trailer_error_count++;
LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data packet. Packet will be skipped. Orbit trailer error count = " << stats.orbit_trailer_error_count;
return;
}
std::sort(bx_vect.begin(), bx_vect.end());
orbit_trailer* trailer = reinterpret_cast<orbit_trailer*>(trailer_ptr);
if (processorType == ProcessorType::GMT) {
meta = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr);
meta = FillOrbitMuon(trailer, rd_ptr, wr_ptr);
orbitCount = meta.counts;
++orbit_per_packet_count;
wr_ptr+= orbitCount*12 + 12*bx_vect.size(); // 12 bytes for each muon/count then 12 bytes for each bx header
wr_ptr+= meta.counts*12 + 12*meta.filled_bxs; // 12 bytes for each muon/count then 12 bytes for each bx header
} else if (processorType == ProcessorType::CALO){
meta = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr);
meta = FillOrbitCalo(trailer, rd_ptr, wr_ptr);
orbitCount = meta.counts;
++orbit_per_packet_count;
// size of calo packet is 4bytes*(8links*7dataWords + 3headerWords)=236 bytes
// Note 7 data words per link because we have the "link number" word + 6 words from calo L2
wr_ptr+= 4*((8*7) + 3)*bx_vect.size();
wr_ptr+= 4*((8*7) + 3)*meta.filled_bxs;
} else {
LOG(ERROR) << "UNKNOWN PROCESSOR_TYPE, EXITING";
throw std::invalid_argument("ERROR: PROCESSOR_TYPE NOT RECOGNISED");
LOG(ERROR) << "UNKNOWN PROCESSOR_TYPE, EXITING";
throw std::invalid_argument("ERROR: PROCESSOR_TYPE NOT RECOGNISED");
}
rd_ptr+= 32 + bx_vect.size()*sizeof(blockMuon) + constants::orbit_trailer_size; // 32 for orbit header, + nBXs + orbit trailer
rd_ptr+= 32 + meta.filled_bxs*sizeof(blockMuon) + constants::orbit_trailer_size; // 32 for orbit header, + nBXs + orbit trailer
counts += orbitCount;
if (firstOrbit){
out.set_firstOrbitN(meta.orbit);
firstOrbit = false;
};
bx_vect.clear();
out.set_firstOrbitN(meta.orbit);
firstOrbit = false;
}
if (rd_ptr < input.end()){
......
......@@ -25,15 +25,16 @@ public:
private:
struct fillOrbitMetadata{
uint32_t counts;
uint32_t orbit;
};
uint32_t counts;
uint32_t orbit;
uint32_t filled_bxs;
};
void process(Slice& input, Slice& out);
bool CheckFrameMultBlock(size_t inputSize);
std::vector<unsigned int> CountBX(Slice& input, char* rd_ptr, bool& trailerError);
bool GetTrailer(Slice& input, char*& rd_ptr);
inline std::pair<uint32_t, bool> ProcessOrbitHeader(char* rd_ptr);
fillOrbitMetadata FillOrbitMuon(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr);
fillOrbitMetadata FillOrbitCalo(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr);
fillOrbitMetadata FillOrbitMuon(orbit_trailer* trailer, char* rd_ptr, char* wr_ptr);
fillOrbitMetadata FillOrbitCalo(orbit_trailer* trailer, char* rd_ptr, char* wr_ptr);
uint32_t FillBril(char* rd_ptr, char* wr_ptr, char* end_ptr);
std::ofstream myfile;
size_t max_size;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment