Commit 93271da9 authored by Remi Mommsen's avatar Remi Mommsen
Browse files

references #136: squash all commits from branch feature_136_evb_superfragment...

references #136: squash all commits from branch feature_136_evb_superfragment into new branch feature_136_evb_fragmenthandling, which is based on baseline_phosphorus_15
parent 95f5f2ed
......@@ -42,6 +42,7 @@ Sources=\
InfoSpaceItems.cc \
readoutunit/DummyFragment.cc \
readoutunit/FedFragment.cc \
readoutunit/FedSuperFragment.cc \
readoutunit/MetaData.cc \
readoutunit/MetaDataRetriever.cc \
readoutunit/SuperFragment.cc \
......
......@@ -3,18 +3,25 @@
#include <algorithm>
#include <arpa/inet.h>
#include <cstdint>
#include <cstring>
#include <iomanip>
#include <iterator>
#include <netdb.h>
#include <sstream>
#include <cstdint>
#include <string>
#include <time.h>
#include <vector>
#include "i2o/i2oDdmLib.h"
namespace evb {
using RUtids = std::vector<I2O_TID>;
using FedIds = std::vector<uint16_t>;
using FedSizes = std::vector<uint32_t>;
const uint16_t GTPe_FED_ID = 814; //0x32e
const uint16_t TCDS_FED_ID = 1024;
const uint16_t SOFT_FED_ID = 1022;
......
......@@ -4,6 +4,7 @@
#include <iostream>
#include <list>
#include <cstdint>
#include <vector>
#include "i2o/i2o.h"
......@@ -93,6 +94,7 @@ namespace evb {
// Note that using uint64_t with shift operations is undefined
};
using EvBids = std::vector<EvBid>;
inline bool EvBid::operator< (const EvBid& other) const
{
......
......@@ -7,16 +7,13 @@
#include "i2o/i2o.h"
#include "i2o/i2oDdmLib.h"
#include "evb/Constants.h"
#include "evb/EvBid.h"
namespace evb {
namespace msg {
using EvBids = std::vector<EvBid>;
using RUtids = std::vector<I2O_TID>;
using FedIds = std::vector<uint16_t>;
/**
* Event request for one or more events identified by the event-builder ids to be sent
* to the BU TID specified. The EVM ignores the evbIds and sends the next nbRequests
......@@ -30,8 +27,8 @@ namespace evb {
uint16_t priority; // Priority of the request (0 is highest)
uint64_t timeStampNS; // time stamp in ns set by the BU when sending request
uint16_t buResourceId; // Index of BU resource used to built the event
uint16_t nbRequests; // Number of requested EvBids
uint16_t nbDiscards; // Number of previously sent EvBids to discard
uint16_t nbEvents; // Number of events in this request
uint16_t nbDiscards; // Number of events fully built on BU
uint16_t nbRUtids; // Number of RU TIDs
EvBid evbIds[0]; // EvBids
I2O_TID ruTids[0]; // List of RU TIDs participating in the event building
......@@ -54,19 +51,13 @@ namespace evb {
/**
* A super-fragment containing data from all FEDs connected to the RU
* A FED super-fragment contains the data from the FED for N consecutive L1 triggers
*/
struct SuperFragment
struct FedSuperFragment
{
uint16_t headerSize; // Size of the message header
uint16_t superFragmentNb; // Index of the super fragment
uint32_t totalSize; // Total size of the super fragment
uint32_t partSize; // Partial size of the super-fragment contained in this message
uint16_t nbDroppedFeds; // Number of FEDs dropped from the super fragment
uint16_t fedIds[]; // List of dropped FED ids
void appendFedIds(FedIds&) const;
uint16_t fedId; // FED (source) ID
uint32_t fedSizes[0]; // List of FED sizes
};
......@@ -81,7 +72,7 @@ namespace evb {
uint32_t nbBlocks; // Total number of I2O blocks
uint32_t blockNb; // Index of the this block
uint64_t timeStampNS; // time stamp in ns set by the BU when sending request
uint16_t nbSuperFragments; // Total number of super fragments
uint16_t eventCount; // Total number of events
uint16_t nbRUtids; // Number of RU TIDs
EvBid evbIds[0]; // The EvBids of the super fragments
I2O_TID ruTids[0]; // List of RU TIDs participating in the event building
......@@ -128,13 +119,6 @@ namespace evb {
);
std::ostream& operator<<
(
std::ostream&,
const evb::msg::SuperFragment
);
std::ostream& operator<<
(
std::ostream&,
......
#ifndef _evb_SharedBufferReference_h_
#define _evb_SharedBufferReference_h_
#include <memory>
#include <vector>
#include "toolbox/mem/Reference.h"
namespace evb
{
class SharedBufferReference : public std::shared_ptr<toolbox::mem::Reference>
{
public:
SharedBufferReference() : shared_ptr(nullptr) {};
SharedBufferReference(toolbox::mem::Reference* p) : shared_ptr(p, [](toolbox::mem::Reference* object) { if (object) object->release(); }) {};
};
using SharedBufferReferences = std::vector<SharedBufferReference>;
}
#endif // _evb_SharedBufferReference_h_
/// emacs configuration
/// Local Variables: -
/// mode: c++ -
/// c-basic-offset: 2 -
/// indent-tabs-mode: nil -
/// End: -
......@@ -5,7 +5,7 @@
#include <cstdint>
#include <limits>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "evb/bu/EventInfo.h"
......@@ -13,6 +13,7 @@
#include "evb/DataLocations.h"
#include "evb/EvBid.h"
#include "evb/I2OMessages.h"
#include "evb/SharedBufferReference.h"
#include "i2o/i2oDdmLib.h"
#include "toolbox/mem/Reference.h"
......@@ -35,7 +36,7 @@ namespace evb {
Event
(
const EvBid&,
const msg::RUtids&,
const RUtids&,
const uint16_t buResourceId,
const bool checkCRC,
const bool calculateCRC32
......@@ -44,21 +45,25 @@ namespace evb {
~Event();
/**
* Append a super fragment to the event.
* Return true if this completes the event.
* Append a FED fragment to the event.
*/
bool appendSuperFragment
void appendFedFragment
(
const I2O_TID ruTid,
toolbox::mem::Reference*,
unsigned char* payload
const uint16_t fedId,
SharedBufferReference,
iovec&& dataLocation
);
/**
* Return true if all super fragments have been received
* Data from the given RU TID has been received
*/
void receivedDataFromRU(const I2O_TID ruTid);
/**
* Return true if data from all RUs has been received
*/
bool isComplete() const
{ return ruSizes_.empty(); }
{ return missingRuTids_.empty(); }
/**
* Check the complete event for integrity of the data
......@@ -75,7 +80,7 @@ namespace evb {
const EventInfoPtr& getEventInfo() const { return eventInfo_; }
const DataLocations& getDataLocations() const { return dataLocations_; }
bool isMissingData() const { return ( ! missingFedIds_.empty() ); }
const msg::FedIds& getMissingFedIds() const { return missingFedIds_; }
const FedIds& getMissingFedIds() const { return missingFedIds_; }
bool hasCRCerrors() const { return ( ! crcErrors_.empty() ); }
std::string getCRCerrorMessage() const;
......@@ -85,17 +90,15 @@ namespace evb {
EventInfoPtr eventInfo_;
using BufferReferences = std::vector<toolbox::mem::Reference*>;
BufferReferences myBufRefs_;
SharedBufferReferences sharedBufferReferences_;
const EvBid evbId_;
const bool checkCRC_;
const bool calculateCRC32_;
const uint16_t buResourceId_;
using RUsizes = std::unordered_map<I2O_TID,uint32_t>;
RUsizes ruSizes_;
std::unordered_set<I2O_TID> missingRuTids_;
msg::FedIds missingFedIds_;
FedIds missingFedIds_;
std::vector<std::string> crcErrors_;
}; // Event
......
......@@ -6,7 +6,7 @@
#include <cstdint>
#include <vector>
#include "toolbox/mem/Reference.h"
#include "evb/SharedBufferReference.h"
namespace evb {
......@@ -26,15 +26,15 @@ namespace evb {
~FragmentChain();
/**
* Append the toolbox::mem::Reference to the fragment.
* Append the SharedBufferReference to the fragment.
*/
bool append(toolbox::mem::Reference*);
bool append(SharedBufferReference&&);
/**
* Return the head of the toolbox::mem::Reference chain
* Return the vector of SharedBufferReference
*/
toolbox::mem::Reference* head() const
{ return head_; }
const SharedBufferReferences& getSharedBufferReferences() const
{ return sharedBufferReferences_; }
/**
* Return the size of the super fragment
......@@ -46,17 +46,14 @@ namespace evb {
* Return the number of missing blocks for this super fragment
*/
size_t getMissingBlocks() const
{ return missingBlocks_; }
{ return blockCount_ - sharedBufferReferences_.size(); }
private:
void chainFragment(toolbox::mem::Reference*);
uint32_t missingBlocks_;
toolbox::mem::Reference* head_;
toolbox::mem::Reference* tail_;
const uint32_t blockCount_;
size_t size_;
SharedBufferReferences sharedBufferReferences_;
}; // FragmentChain
......
......@@ -15,6 +15,7 @@
#include "evb/I2OMessages.h"
#include "evb/InfoSpaceItems.h"
#include "evb/OneToOneQueue.h"
#include "evb/SharedBufferReference.h"
#include "evb/bu/Configuration.h"
#include "evb/bu/FragmentChain.h"
#include "toolbox/lang/Class.h"
......@@ -62,7 +63,7 @@ namespace evb {
/**
* Callback for I2O message containing a super fragment
*/
void superFragmentCallback(toolbox::mem::Reference*);
void superFragmentCallback(SharedBufferReference&&);
/**
* Send request for N trigger data fragments to the RUs
......@@ -127,7 +128,7 @@ namespace evb {
void startRequestFragmentsWorkLoop();
bool eventFragment(toolbox::task::WorkLoop*);
bool requestFragments(toolbox::task::WorkLoop*);
void handleEventFragment(toolbox::mem::Reference*);
void handleEventFragment(SharedBufferReference&&);
void sendRequests();
uint64_t getTimeStamp() const;
void getApplicationDescriptorForEVM();
......@@ -140,7 +141,7 @@ namespace evb {
toolbox::mem::Pool* msgPool_;
const ConfigurationPtr& configuration_;
using EventFragmentFIFO = OneToOneQueue<toolbox::mem::Reference*>;
using EventFragmentFIFO = OneToOneQueue<SharedBufferReference>;
EventFragmentFIFO eventFragmentFIFO_;
std::mutex eventFragmentFIFOmutex_;
......@@ -229,11 +230,11 @@ namespace evb {
std::ostringstream* out
)
{
toolbox::mem::Reference* bufRef = fragmentChain.head();
if ( bufRef )
for (auto const& sharedBufferReference : fragmentChain.getSharedBufferReferences() )
if ( sharedBufferReference )
{
msg::DataBlockMsg* msg =
(msg::DataBlockMsg*)bufRef->getDataLocation();
(msg::DataBlockMsg*)sharedBufferReference->getDataLocation();
*out << *msg;
}
else
......
......@@ -25,9 +25,9 @@
#include "evb/PerformanceMonitor.h"
#include "evb/readoutunit/BUposter.h"
#include "evb/readoutunit/Configuration.h"
#include "evb/readoutunit/FedSuperFragment.h"
#include "evb/readoutunit/FragmentRequest.h"
#include "evb/readoutunit/StateMachine.h"
#include "evb/readoutunit/SuperFragment.h"
#include "i2o/i2oDdmLib.h"
#include "i2o/Method.h"
#include "interface/shared/fed_header.h"
......@@ -143,25 +143,17 @@ namespace evb {
private:
using SuperFragments = std::vector<SuperFragmentPtr>;
using FedSuperFragments = std::vector<FedSuperFragmentPtr>;
void resetMonitoringCounters();
void startProcessingWorkLoop();
void createProcessingWorkLoops();
bool process(toolbox::task::WorkLoop*);
bool processRequest(FragmentRequestPtr&, SuperFragments&);
bool processRequest(FragmentRequestPtr&, FedSuperFragments&);
void handleRequest(const msg::EventRequest*, FragmentRequestPtr&);
void sendData(const FragmentRequestPtr&, const SuperFragments&);
void sendData(const FragmentRequestPtr&, const FedSuperFragments&);
toolbox::mem::Reference* getNextBlock(const uint32_t blockNb) const;
void fillSuperFragmentHeader
(
unsigned char*& payload,
uint32_t& remainingPayloadSize,
const uint32_t superFragmentNb,
const SuperFragmentPtr& superFragment,
const uint32_t currentFragmentSize
) const;
void waitForNextTrigger(const uint16_t nbRequests);
void waitForNextTrigger(const uint16_t nbEvents);
bool isEmpty();
void doLumiSectionTransition() {};
std::string getHelpTextForBuRequests() const;
......@@ -316,7 +308,6 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::readoutMsgCallback(toolbox::mem::Re
msg::ReadoutMsg* readoutMsg = (msg::ReadoutMsg*)stdMsg;
uint32_t nbDiscards = 0;
uint32_t nbRequests = 0;
uint32_t nbRequestMsg = 0;
unsigned char* payload = (unsigned char*)&readoutMsg->requests[0];
......@@ -326,16 +317,14 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::readoutMsgCallback(toolbox::mem::Re
nbDiscards += eventRequest->nbDiscards;
if ( eventRequest->nbRequests > 0 )
if ( eventRequest->buResourceId > 0 ) // there's a resource requesting events
{
nbRequests += eventRequest->nbRequests;
++nbRequestMsg;
// The request is shared by the RUproxy and BUproxy
FragmentRequestPtr fragmentRequest = std::make_shared<FragmentRequest>();
fragmentRequest->buTid = eventRequest->buTid;
fragmentRequest->buResourceId = eventRequest->buResourceId;
fragmentRequest->timeStampNS = eventRequest->timeStampNS;
fragmentRequest->nbRequests = eventRequest->nbRequests;
handleRequest(eventRequest, fragmentRequest);
requestMonitoring_.buTimestamps[eventRequest->buTid] = eventRequest->timeStampNS;
}
......@@ -354,7 +343,7 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::readoutMsgCallback(toolbox::mem::Re
const uint32_t msgSize = stdMsg->MessageSize << 2;
requestMonitoring_.perf.sumOfSizes += msgSize;
requestMonitoring_.perf.sumOfSquares += msgSize*msgSize;
requestMonitoring_.perf.logicalCount += nbRequests;
requestMonitoring_.perf.logicalCount += readoutMsg->nbRequests;
++requestMonitoring_.perf.i2oCount;
requestMonitoring_.activeRequests += nbRequestMsg;
}
......@@ -382,12 +371,12 @@ bool evb::readoutunit::BUproxy<ReadoutUnit>::process(toolbox::task::WorkLoop* wl
try
{
FragmentRequestPtr fragmentRequest;
SuperFragments superFragments;
FedSuperFragments fedSuperFragments;
while ( processRequest(fragmentRequest,superFragments) )
while ( processRequest(fragmentRequest,fedSuperFragments) )
{
sendData(fragmentRequest, superFragments);
superFragments.clear();
sendData(fragmentRequest,fedSuperFragments);
fedSuperFragments.clear();
}
}
catch(xcept::Exception& e)
......@@ -434,19 +423,18 @@ template<class ReadoutUnit>
void evb::readoutunit::BUproxy<ReadoutUnit>::sendData
(
const FragmentRequestPtr& fragmentRequest,
const SuperFragments& superFragments
const FedSuperFragments& fedSuperFragments
)
{
uint32_t blockNb = 1;
const uint16_t nbSuperFragments = superFragments.size();
assert( nbSuperFragments == fragmentRequest->evbIds.size() );
const uint16_t nbRUtids = (nbSuperFragments>0)?fragmentRequest->ruTids.size():0;
const uint16_t eventCount = fragmentRequest->evbIds.size();
const uint16_t nbRUtids = (eventCount>0)?fragmentRequest->ruTids.size():0;
toolbox::mem::Reference* head = getNextBlock(blockNb);
toolbox::mem::Reference* tail = head;
const uint32_t blockHeaderSize = sizeof(msg::DataBlockMsg)
+ nbSuperFragments * sizeof(EvBid)
+ eventCount * sizeof(EvBid)
+ ((nbRUtids+1)&~1) * sizeof(I2O_TID); // always have an even number of 32-bit I2O_TIDs to keep 64-bit alignment
assert( blockHeaderSize % 8 == 0 );
......@@ -454,35 +442,18 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::sendData
unsigned char* payload = (unsigned char*)head->getDataLocation() + blockHeaderSize;
uint32_t remainingPayloadSize = readoutUnit_->getConfiguration()->blockSize - blockHeaderSize;
for (uint32_t i=0; i < nbSuperFragments; ++i)
for (auto& fedSuperFragment : fedSuperFragments)
{
const SuperFragmentPtr& superFragment = superFragments[i];
uint32_t remainingSuperFragmentSize = superFragment->getSize();
fillSuperFragmentHeader(payload,remainingPayloadSize,i+1,superFragment,remainingSuperFragmentSize);
const SuperFragment::FedFragments& fedFragments = superFragment->getFedFragments();
for (auto& fragment : fedFragments)
while ( ! fedSuperFragment->fillData(payload,remainingPayloadSize) )
{
uint32_t copiedSize = 0;
while ( ! fragment->fillData(payload,remainingPayloadSize,copiedSize) )
{
// not all data fit into the remainingPayloadSize
// get a new block
remainingSuperFragmentSize -= copiedSize;
toolbox::mem::Reference* nextBlock = getNextBlock(++blockNb);
payload = (unsigned char*)nextBlock->getDataLocation() + sizeof(msg::DataBlockMsg);
remainingPayloadSize = readoutUnit_->getConfiguration()->blockSize - sizeof(msg::DataBlockMsg);
fillSuperFragmentHeader(payload,remainingPayloadSize,i+1,superFragment,remainingSuperFragmentSize);
tail->setNextReference(nextBlock);
tail = nextBlock;
}
payload += copiedSize;
remainingPayloadSize -= copiedSize;
remainingSuperFragmentSize -= copiedSize;
const fedt_t* trailer = (fedt_t*)(payload - sizeof(fedt_t));
assert ( FED_TCTRLID_EXTRACT(trailer->eventsize) == FED_SLINK_END_MARKER );
// not all data fit into the remainingPayloadSize
// get a new block
tail->setDataSize( readoutUnit_->getConfiguration()->blockSize - remainingPayloadSize );
toolbox::mem::Reference* nextBlock = getNextBlock(++blockNb);
payload = (unsigned char*)nextBlock->getDataLocation() + sizeof(msg::DataBlockMsg);
remainingPayloadSize = readoutUnit_->getConfiguration()->blockSize - sizeof(msg::DataBlockMsg);
tail->setNextReference(nextBlock);
tail = nextBlock;
}
}
tail->setDataSize( readoutUnit_->getConfiguration()->blockSize - remainingPayloadSize );
......@@ -493,7 +464,6 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::sendData
uint32_t lastEventNumberToBUs = 0;
uint32_t lastLumiSectionToBUs = 0;
// Prepare each event data block for the BU
while (bufRef)
{
......@@ -515,25 +485,25 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::sendData
dataBlockMsg->buResourceId = fragmentRequest->buResourceId;
dataBlockMsg->timeStampNS = fragmentRequest->timeStampNS;
dataBlockMsg->nbBlocks = blockNb;
dataBlockMsg->nbSuperFragments = nbSuperFragments;
dataBlockMsg->eventCount = eventCount;
dataBlockMsg->nbRUtids = nbRUtids;
if ( dataBlockMsg->blockNb == 1 )
{
dataBlockMsg->headerSize = blockHeaderSize;
unsigned char* payload = (unsigned char*)&dataBlockMsg->evbIds;
size_t size = nbSuperFragments*sizeof(EvBid);
memcpy(payload,&fragmentRequest->evbIds[0],size);
payload += size;
if ( nbSuperFragments > 0 )
unsigned char* loc = (unsigned char*)&dataBlockMsg->evbIds;
size_t size = eventCount*sizeof(EvBid);
memcpy(loc,&fragmentRequest->evbIds[0],size);
loc += size;
if ( eventCount > 0 )
{
lastEventNumberToBUs = fragmentRequest->evbIds[nbSuperFragments-1].eventNumber();
lastLumiSectionToBUs = fragmentRequest->evbIds[nbSuperFragments-1].lumiSection();
lastEventNumberToBUs = fragmentRequest->evbIds[eventCount-1].eventNumber();
lastLumiSectionToBUs = fragmentRequest->evbIds[eventCount-1].lumiSection();
}
size = nbRUtids*sizeof(I2O_TID);
memcpy(payload,&fragmentRequest->ruTids[0],size);
memcpy(loc,&fragmentRequest->ruTids[0],size);
}
else
{
......@@ -558,11 +528,11 @@ void evb::readoutunit::BUproxy<ReadoutUnit>::sendData
}
dataMonitoring_.lastEventNumberToBUs = lastEventNumberToBUs;
dataMonitoring_.outstandingEvents += nbSuperFragments;
dataMonitoring_.outstandingEvents += eventCount;
dataMonitoring_.perf.i2oCount += i2oCount;
dataMonitoring_.perf.sumOfSizes += payloadSize;
dataMonitoring_.perf.sumOfSquares += payloadSize*payloadSize;
dataMonitoring_.perf.logicalCount += nbSuperFragments;
dataMonitoring_.perf.logicalCount += eventCount;
}
if ( lumiTransition )
......@@ -585,7 +555,6 @@ toolbox::mem::Reference* evb::readoutunit::BUproxy<ReadoutUnit>::getNextBlock
const uint32_t blockSize = readoutUnit_->getConfiguration()->blockSize;
bufRef = toolbox::mem::getMemoryPoolFactory()->
getFrame(msgPool_,blockSize);
bufRef->setDataSize(blockSize);
}
catch(toolbox::mem::exception::Exception&)
{
......@@ -601,45 +570,6 @@ toolbox::mem::Reference* evb::readoutunit::BUproxy<ReadoutUnit>::getNextBlock
}
template<class ReadoutUnit>
void evb::readoutunit::BUproxy<ReadoutUnit>::fillSuperFragmentHeader
(
unsigned char*& payload,
uint32_t& remainingPayloadSize,
const uint32_t superFragmentNb,
const SuperFragmentPtr& superFragment,
const uint32_t currentFragmentSize
) const
{
const SuperFragment::MissingFedIds& missingFedIds = superFragment->getMissingFedIds();
const uint16_t nbDroppedFeds = missingFedIds.size();
const uint32_t headerSize = sizeof(msg::SuperFragment)
+ ((nbDroppedFeds-1 + 3) / 4) * sizeof(uint64_t);
// Keep 64-bit alignment if nbDroppedFeds > 1.
// ceil(x/y) can be expressed as (x+y-1)/y for positive integers
assert( headerSize % 8 == 0 );
if ( remainingPayloadSize < headerSize )
{
remainingPayloadSize = 0;
return;
}
msg::SuperFragment* superFragmentMsg = (msg::SuperFragment*)payload;
payload += headerSize;
remainingPayloadSize -= headerSize;
superFragmentMsg->headerSize = headerSize;
superFragmentMsg->superFragmentNb = superFragmentNb;
superFragmentMsg->totalSize = superFragment->getSize();
superFragmentMsg->partSize = currentFragmentSize > remainingPayloadSize ? remainingPayloadSize : currentFragmentSize;
superFragmentMsg->nbDroppedFeds = nbDroppedFeds;
memcpy(&superFragmentMsg->fedIds[0],&missingFedIds[0],nbDroppedFeds*sizeof(uint16_t));
}
template<class ReadoutUnit>
void evb