Commit f37e24f9 authored by Markus Frank's avatar Markus Frank
Browse files

Merge branch 'satellite' into 'master'

Update repository

See merge request !685
parents b36ce4c2 f95347a4
......@@ -63,7 +63,7 @@ add_subdirectory(Online/Dataflow)
add_subdirectory(Online/DataflowExample)
add_subdirectory(Online/Gaucho)
add_subdirectory(Online/DefHLTUtils)
add_subdirectory(Online/EventBuilding)
# add_subdirectory(Online/EventBuilding)
add_subdirectory(Online/EventData)
add_subdirectory(Online/FarmConfig)
add_subdirectory(Online/GaudiOnline)
......
......@@ -97,6 +97,11 @@ namespace Online {
/// Filled buffers to be worked down and fed to MBM
std::list<Buffer> m_todo { };
/// Mutex to lock run lists
std::mutex m_runlistLock { };
/// List of exhausted/bad runs when reading from HLT1 storage
std::set<std::string> m_bad_runs;
protected:
/// Runable implementation : Run the class implementation
virtual int i_run() override;
......@@ -175,13 +180,12 @@ namespace {
}
using namespace Online;
using namespace std;
// Instantiation of a static factory class used by clients to create instances of this service
DECLARE_DATAFLOW_NAMED_COMPONENT_NS(Online,Dataflow_StorageReader,StorageReader)
/// Initializing constructor
StorageReader::StorageReader(const string& nam, Context& ctxt)
StorageReader::StorageReader(const std::string& nam, Context& ctxt)
: DiskReader(nam, ctxt)
{
this->declareProperty("Server", this->m_server);
......@@ -199,9 +203,10 @@ int StorageReader::initialize() {
if ( this->DiskReader::initialize() != DF_SUCCESS )
return this->error("Failed to initialize DiskReader base class.");
if ( this->m_num_buffers > 0 ) {
for(size_t i = 0; i < this->m_num_buffers; ++i)
for(std::size_t i = 0; i < this->m_num_buffers; ++i)
this->m_free.emplace_back(Buffer());
}
this->m_bad_runs.clear();
return DF_SUCCESS;
}
......@@ -223,9 +228,10 @@ int StorageReader::start() {
this->m_loadEnabled = true;
this->m_receiveEvts = true;
this->m_goto_paused = false;
this->m_bad_runs.clear();
if ( this->m_num_buffers > 0 ) {
for(size_t i = 0; i < this->m_num_threads; ++i)
this->m_threads.emplace_back(make_unique<thread>([this]() { this->async_load_buffers(); }));
for(std::size_t i = 0; i < this->m_num_threads; ++i)
this->m_threads.emplace_back(std::make_unique<std::thread>([this]() { this->async_load_buffers(); }));
}
return DF_SUCCESS;
}
......@@ -261,31 +267,43 @@ int StorageReader::cancel() {
int StorageReader::open_file_net(Buffer& buffer) {
using namespace storage;
vector<string> bad_runs;
uri_t url(this->m_server);
fdb_client client(this->m_fdb_version);
//++m_num_file_open;
client.fdbclient = client::create<client::sync>(url.host, url.port, 10000, m_debugClient);
client.fdbclient = client::create<client::sync>(url.host, url.port, 10000, this->m_debugClient);
client.create_client = std::function(client::create<client::sync>);
for(const auto& run : this->m_allowedRuns ) {
time_t date = 0;
size_t length = 0;
string loc = run + '/'; /// interprete_file_name(m_filePrefix, run);
info("Checking for files matching pattern: '%s'", loc.c_str());
auto reply = client.next_object_delete(loc, date, length);
if ( reply.status == reply.ok ) {
if ( length < m_minFileSizeMB*MBYTE ) {
info("Drop file: %s %ld MB [too-small]", loc.c_str(), length/MBYTE);
}
buffer.name = loc;
buffer.data = move(reply.content);
buffer.runNumber = ::strtol(run.c_str(), 0, 10);
info("Loaded file: %s [%ld MB]", loc.c_str(), length/MBYTE);
return DF_SUCCESS;
bool is_good_run = false; {
std::lock_guard<std::mutex> lock(this->m_runlistLock);
is_good_run = this->m_bad_runs.find(run) == this->m_bad_runs.end();
}
else if ( reply.status == reply.not_found ) {
bad_runs.push_back(run);
if ( is_good_run ) {
std::time_t date = 0;
std::size_t length = 0;
std::string loc = run + '/'; /// interprete_file_name(m_filePrefix, run);
info("Checking for files matching pattern: '%s'", loc.c_str());
auto reply = client.next_object_delete(loc, date, length);
if ( reply.status == reply.ok ) {
if ( length < this->m_minFileSizeMB*MBYTE ) {
info("Drop file: %s %ld MB [too-small]", loc.c_str(), length/MBYTE);
}
buffer.name = loc;
buffer.data = std::move(reply.content);
buffer.runNumber = std::strtol(run.c_str(), 0, 10);
info("Loaded file: %s [%ld MB]", loc.c_str(), length/MBYTE);
return DF_SUCCESS;
}
else if ( reply.status == reply.gone ) {
std::lock_guard<std::mutex> lock(this->m_runlistLock);
this->m_bad_runs.insert(run);
}
#if 0
else if ( reply.status == reply.not_found ) {
std::lock_guard<std::mutex> lock(this->m_runlistLock);
this->m_bad_runs.insert(run);
}
#endif
}
}
buffer.runNumber = -1;
......@@ -293,7 +311,7 @@ int StorageReader::open_file_net(Buffer& buffer) {
}
int StorageReader::open_file_posix_root(const string& loc, Buffer& buffer) {
int StorageReader::open_file_posix_root(const std::string& loc, Buffer& buffer) {
TUrl url(loc.c_str());
TString prot = url.GetProtocol();
/// If there is a data protocol present, we invoke
......@@ -336,55 +354,54 @@ int StorageReader::open_file_posix_root(const string& loc, Buffer& buffer) {
return error("Failed to open TFile: %s [Protocol error]", loc.c_str());
}
int StorageReader::open_file_posix_raw(const string& loc, Buffer& buffer) {
int StorageReader::open_file_posix_raw(const std::string& loc, Buffer& buffer) {
RawFile input(loc);
if ( input.open() ) {
size_t file_length = input.data_size();
std::size_t file_length = input.data_size();
buffer.data.resize(file_length);
size_t read_length = input.read(&buffer.data.at(0), file_length);
std::size_t read_length = input.read(&buffer.data.at(0), file_length);
if ( read_length == file_length ) {
++this->m_filesClosed;
input.close();
return DF_SUCCESS;
}
return error("Failed to read data from file: %s", loc.c_str());
return this->error("Failed to read data from file: %s", loc.c_str());
}
return error("Failed to open file: %s [Protocol error]", loc.c_str());
return this->error("Failed to open file: %s [Protocol error]", loc.c_str());
}
int StorageReader::open_file_posix(Buffer& buffer) {
using namespace storage;
unique_ptr<client> client;
uri_t url(this->m_server);
string location;
std::unique_ptr<storage::client> client;
storage::uri_t url(this->m_server);
std::string location;
buffer.runNumber = -1;
for(auto i = begin(m_allowedRuns); i != end(m_allowedRuns); ++i ) {
for(auto i = std::begin(this->m_allowedRuns); i != std::end(this->m_allowedRuns); ++i ) {
const auto& run = *i;
storage::client::reqheaders_t hdrs;
string loc, file_name = interprete_file_name(m_filePrefix, run);
info("Checking for files matching pattern: '%s'", file_name.c_str());
std::string loc, file_name = interprete_file_name(this->m_filePrefix, run);
this->info("Checking for files matching pattern: '%s'", file_name.c_str());
loc = "/next?prefix="+file_name;
hdrs.emplace_back(http::constants::location, file_name);
client = client::create<client::sync>(url.host, url.port, 10000, m_debugClient);
reply_t reply = client->request(http::constants::del, loc, hdrs);
if( reply.status == reply.ok || reply.status == reply_t::temp_redirect ) {
client = storage::client::create<storage::client::sync>(url.host, url.port, 10000, m_debugClient);
storage::reply_t reply = client->request(http::constants::del, loc, hdrs);
if( reply.status == reply.ok || reply.status == reply.temp_redirect ) {
for ( const auto& h : reply.headers ) {
if ( h.name == http::constants::location ) {
string loc = "file://"+h.value;
int status = ( loc.find("://") == string::npos )
? open_file_posix_raw(loc, buffer)
: open_file_posix_root(loc, buffer);
std::string loc = "file://"+h.value;
int status = (loc.find("://") == std::string::npos)
? this->open_file_posix_raw(loc, buffer)
: this->open_file_posix_root(loc, buffer);
if ( status == DF_SUCCESS ) {
warning("Loaded file: %s [%ld MB]", loc.c_str(), buffer.data.size()/MBYTE);
this->warning("Loaded file: %s [%ld MB]", loc.c_str(), buffer.data.size()/MBYTE);
buffer.name = loc;
if ( run == "*" ) {
size_t idx = loc.rfind("/Run_")+1;
const char* ptr = loc.c_str() + ((idx != string::npos) ? idx : 0);
if ( 1 != ::sscanf(ptr, "%07d", &buffer.runNumber) ) {
if ( 1 != ::sscanf(ptr, "%08d", &buffer.runNumber) ) {
::sscanf(ptr, "%010d", &buffer.runNumber);
const char* ptr = loc.c_str() + ((idx != std::string::npos) ? idx : 0);
if ( 1 != std::sscanf(ptr, "%07d", &buffer.runNumber) ) {
if ( 1 != std::sscanf(ptr, "%08d", &buffer.runNumber) ) {
std::sscanf(ptr, "%010d", &buffer.runNumber);
}
}
}
......@@ -396,30 +413,30 @@ int StorageReader::open_file_posix(Buffer& buffer) {
}
}
}
else if ( reply.status == reply_t::not_found ) {
auto err = http::HttpReply::stock_status(reply.status);
warning("No more files for run: %s [%s]", run.c_str(),
err.substr(0,err.length()-2).c_str());
m_allowedRuns.erase(i);
i = begin(m_allowedRuns);
else if ( reply.status == storage::reply_t::not_found ) {
auto err = storage::reply_t::stock_status(reply.status);
this->warning("No more files for run: %s [%s]", run.c_str(),
err.substr(0,err.length()-2).c_str());
this->m_allowedRuns.erase(i);
i = begin(this->m_allowedRuns);
}
else {
auto err = http::HttpReply::stock_status(reply.status);
warning("Unknown fsDB error: %s [%d] run: %s",
err.substr(0,err.length()-2).c_str(),
int(reply.status), run.c_str());
auto err = storage::reply_t::stock_status(reply.status);
this->warning("Unknown fsDB error: %s [%d] run: %s",
err.substr(0,err.length()-2).c_str(),
int(reply.status), run.c_str());
}
client.reset();
::lib_rtl_sleep(100);
}
/// Error no location header!
return error("Failed to open new data file.");
return this->error("Failed to open new data file.");
}
int StorageReader::i_open_file(Buffer& buffer) {
int ret = DF_ERROR;
int partid = this->context.mbm->partitionID();
string typ = RTL::str_lower(this->m_dataType);
std::string typ = RTL::str_lower(this->m_dataType);
if ( this->m_requireConsumers ) {
int cons_status = this->waitForConsumers(m_buffer);
......@@ -465,9 +482,9 @@ void StorageReader::async_load_buffers() {
}
/// Consumers are present....start loading if buffers are present
{
lock_guard<mutex> lock(this->m_bufferLock);
std::lock_guard<std::mutex> lock(this->m_bufferLock);
if ( !this->m_free.empty() ) {
buffer = move(this->m_free.front());
buffer = std::move(this->m_free.front());
this->m_free.pop_front();
got_buffer = true;
}
......@@ -480,16 +497,16 @@ void StorageReader::async_load_buffers() {
continue;
}
{ // If we got a cancel in between: Stop reading
lock_guard<mutex> lock(this->m_bufferLock);
std::lock_guard<std::mutex> lock(this->m_bufferLock);
if ( !this->m_receiveEvts || this->m_goto_paused || !this->m_loadEnabled ) {
this->m_free.emplace_back(move(buffer));
this->m_free.emplace_back(std::move(buffer));
return;
}
++this->m_is_reading ;
}
if ( this->i_open_file(buffer) == DF_SUCCESS ) {
lock_guard<mutex> lock(this->m_bufferLock);
this->m_todo.emplace_back(move(buffer));
std::lock_guard<std::mutex> lock(this->m_bufferLock);
this->m_todo.emplace_back(std::move(buffer));
}
else {
this->m_goto_paused = true;
......@@ -498,16 +515,15 @@ void StorageReader::async_load_buffers() {
--this->m_is_reading ;
::lib_rtl_sleep(m_poll_tmo);
}
/// This is bad: All data buffered is lost.
/// However: The problem is on the client side:
/// They need to empty the MBM buffers. Otherwise this producer will block!
{
lock_guard<mutex> lock(this->m_bufferLock);
// If PAUSE was requested, we do not drop buffers
// (ONLY on stop: m_goto_paused = false && m_receiveEvts == false)
if ( !this->m_goto_paused && !this->m_receiveEvts ) {
std::lock_guard<std::mutex> lock(this->m_bufferLock);
while( !this->m_todo.empty() ) {
auto buffer = move(this->m_todo.back());
auto buffer = std::move(this->m_todo.back());
this->warning("Drop file: %s [%ld MB]", buffer.name.c_str(), buffer.data.size()/MBYTE);
buffer.name = "";
this->m_free.emplace_back(move(buffer));
this->m_free.emplace_back(std::move(buffer));
this->m_todo.pop_back();
}
}
......@@ -522,21 +538,21 @@ int StorageReader::open_file(Buffer& buffer) {
// Asynchronous reading: Wait until some buffer is ready
{
lock_guard<mutex> lock(this->m_bufferLock);
std::lock_guard<std::mutex> lock(this->m_bufferLock);
if ( !buffer.data.empty() ) {
buffer.name = "";
this->m_free.emplace_back(move(buffer));
this->m_free.emplace_back(std::move(buffer));
}
}
while( 1 ) {{
lock_guard<mutex> lock(this->m_bufferLock);
std::lock_guard<std::mutex> lock(this->m_bufferLock);
if ( m_todo.empty() ) {
if ( 0 == this->m_is_reading && !this->m_receiveEvts ) {
return DF_ERROR;
}
}
else {
buffer = move(this->m_todo.front());
buffer = std::move(this->m_todo.front());
this->m_todo.pop_front();
return DF_SUCCESS;
}
......@@ -604,7 +620,11 @@ int StorageReader::i_run() {
this->updateRunNumber(current_buffer.runNumber);
}
if ( current_input.isMapped() || current_input.isOpen() ) {
if ( !(current_input.isMapped() || current_input.isOpen()) ) {
current_input.reset();
continue;
}
else {
// Remember data position pointer to possibly recover the operation
auto pos = current_input.position();
// Handle request: access data and push to MBM
......@@ -612,15 +632,19 @@ int StorageReader::i_run() {
if ( status == DF_SUCCESS ) {
// All OK.
}
else if ( !(current_input.isOpen() || current_input.isMapped()) ) {
continue; // File is processed: Continue and ask for next file
}
else if ( status == DF_CONTINUE && this->m_receiveEvts ) { // Recoverable: missing consumer etc.
if ( (current_buffer.data.size() - pos) > sizeof(EventHeader) ) {
// **MSF**: Is this really correct ???
current_input.map_memory(&current_buffer.data.at(0), current_buffer.data.size());
current_input.position(pos, true);
}
continue;
}
else if ( status == DF_CONTINUE ) { // Stop requested in addiiton
break;
break; // File is processed: Continue with outer loop
}
else if ( status == DF_ERROR ) { // Unrecoverable failure: goto state ERROR after cleanup
this->m_receiveEvts = false;
......
......@@ -47,7 +47,6 @@ namespace {
static constexpr long MBYTE = (1024e0*1024e0);
}
using namespace std;
using namespace Online;
DECLARE_DATAFLOW_NAMED_COMPONENT_NS(Online,Dataflow_StorageWriter,StorageWriter)
......@@ -86,10 +85,10 @@ namespace {
struct StorageWriter::POSIX_FILE {
RawFile file { };
mutex lock;
std::mutex lock;
long length { 0 };
uint32_t run { 0 };
time_t last_write { 0 };
std::time_t last_write { 0 };
POSIX_FILE() = default;
POSIX_FILE(POSIX_FILE&& copy) = delete;
POSIX_FILE(const POSIX_FILE& copy) = delete;
......@@ -103,7 +102,7 @@ struct StorageWriter::POSIX_FILE {
this->run = 0;
}
int open(const std::string& fname, int verifyNFS) {
filesystem::path parent = filesystem::path(fname).parent_path();
std::filesystem::path parent = std::filesystem::path(fname).parent_path();
if ( verifyNFS ) {
if ( !_check_nfs_access(parent.string()) ) {
errno = ENODEV;
......@@ -131,11 +130,11 @@ struct StorageWriter::POSIX_FILE {
/// ROOT implementation
struct StorageWriter::ROOT_FILE {
unique_ptr<TFile> file;
long length = 0;
uint32_t run = 0;
time_t last_write = 0;
mutex lock;
std::unique_ptr<TFile> file;
long length = 0;
uint32_t run = 0;
std::time_t last_write = 0;
std::mutex lock;
ROOT_FILE() = default;
ROOT_FILE(ROOT_FILE&& copy) = delete;
ROOT_FILE(const ROOT_FILE& copy) = delete;
......@@ -149,7 +148,7 @@ struct StorageWriter::ROOT_FILE {
this->file.reset();
}
int open(const std::string& fname, int verifyNFS) {
filesystem::path parent = filesystem::path(fname).parent_path();
std::filesystem::path parent = std::filesystem::path(fname).parent_path();
if ( verifyNFS ) {
if ( !_check_nfs_access(parent.string()) ) {
errno = ENODEV;
......@@ -199,34 +198,34 @@ struct StorageWriter::ROOT_FILE {
};
/// Initializing constructor
StorageWriter::StorageWriter(const string& nam, Context& ctxt)
StorageWriter::StorageWriter(const std::string& nam, Context& ctxt)
: Component(nam, ctxt)
{
string fname = "/${PARTITION}/${RUN1000}/Run_${RUN}_${NODE}_${TIME}_${PID}_${SEQ}.mdf";
declareProperty("Server", m_server);
declareProperty("FDBVersion", m_fdb_version = 0);
declareProperty("BufferSizeMB", m_bufferSize = 1024);
declareProperty("WriteErrorRetry", m_write_error_retry = 10);
declareProperty("WriteErrorSleep", m_write_error_sleep = 2000);
declareProperty("PollTimeout", m_poll_tmo = 1000);
declareProperty("IdleTimeout", m_idle_tmo = 20);
declareProperty("CancelTimeout", m_cancel_tmo = 100);
declareProperty("NumBuffers", m_num_buffers = 2);
declareProperty("NumThreads", m_num_threads = 1);
declareProperty("MinFileSizeMB", m_minFileSizeMB = 5);
declareProperty("MaxFileSizeMB", m_maxFileSizeMB = 5000);
declareProperty("ForceMDF", m_force_mdf = 0);
declareProperty("DebugClient", m_debugClient = 0);
declareProperty("OutputType", m_outputType = "network");
declareProperty("HaveFileDB", m_haveFileDB = 0);
declareProperty("EnableWriting", m_enableWriting = 1);
declareProperty("VerifyNFS", m_verifyNFS = 1);
declareProperty("Stream", m_stream = "RAW");
declareProperty("RunType", m_runType = "");
declareProperty("PartitionName", m_partitionName = "LHCb");
declareProperty("FileName", m_fileName = fname);
declareProperty("ThreadFileQueues", m_threadFileQueues = false);
std::string fname = "/${PARTITION}/${RUN1000}/Run_${RUN}_${NODE}_${TIME}_${PID}_${SEQ}.mdf";
this->declareProperty("Server", m_server);
this->declareProperty("FDBVersion", m_fdb_version = 0);
this->declareProperty("BufferSizeMB", m_bufferSize = 1024);
this->declareProperty("WriteErrorRetry", m_write_error_retry = 10);
this->declareProperty("WriteErrorSleep", m_write_error_sleep = 2000);
this->declareProperty("PollTimeout", m_poll_tmo = 1000);
this->declareProperty("IdleTimeout", m_idle_tmo = 20);
this->declareProperty("CancelTimeout", m_cancel_tmo = 100);
this->declareProperty("NumBuffers", m_num_buffers = 2);
this->declareProperty("NumThreads", m_num_threads = 1);
this->declareProperty("MinFileSizeMB", m_minFileSizeMB = 5);
this->declareProperty("MaxFileSizeMB", m_maxFileSizeMB = 5000);
this->declareProperty("ForceMDF", m_force_mdf = 0);
this->declareProperty("DebugClient", m_debugClient = 0);
this->declareProperty("OutputType", m_outputType = "network");
this->declareProperty("HaveFileDB", m_haveFileDB = 0);
this->declareProperty("EnableWriting", m_enableWriting = 1);
this->declareProperty("VerifyNFS", m_verifyNFS = 1);
this->declareProperty("Stream", m_stream = "RAW");
this->declareProperty("RunType", m_runType = "");
this->declareProperty("PartitionName", m_partitionName = "LHCb");
this->declareProperty("FileName", m_fileName = fname);
this->declareProperty("ThreadFileQueues", m_threadFileQueues = false);
storage::error_check_enable(false);
}
......@@ -243,7 +242,7 @@ int StorageWriter::initialize() {
this->m_curr_run = 0;
this->m_cancelled = 0;
if ( this->m_fileName.find(":") != string::npos )
if ( this->m_fileName.find(":") != std::string::npos )
this->m_output_type = ROOT_STORAGE, m_outputType = "ROOT";
else if ( ::strcasecmp(this->m_outputType.c_str(), "NFS") == 0 )
this->m_output_type = POSIX_STORAGE;
......@@ -257,7 +256,7 @@ int StorageWriter::initialize() {
for(Buffer& b : this->m_free)
::free(b.buffer);
this->m_free.clear();
for(size_t i=0; i<this->m_num_buffers; ++i) {
for(std::size_t i=0; i<this->m_num_buffers; ++i) {
Buffer b;
b.buffer = (uint8_t*)::malloc(m_bufferSize+1024);
b.pointer = b.buffer;
......@@ -273,8 +272,8 @@ int StorageWriter::initialize() {
this->declareMonitor("BytesDropped", m_bytesDropped=0,"Number of bytes dropped");
this->declareMonitor("Errors", m_writeErrors=0, "Number of write errors");
this->m_shutdown = false;
for(size_t i=0; i<this->m_num_threads; ++i)
this->m_threads.emplace_back(make_unique<thread>([this]{ this->process_buffers(); }));
for(std::size_t i=0; i<this->m_num_threads; ++i)
this->m_threads.emplace_back(std::make_unique<std::thread>([this]{ this->process_buffers(); }));
storage::error_check_enable(this->outputLevel < INFO);
}
return sc;
......@@ -296,7 +295,7 @@ int StorageWriter::start() {
int StorageWriter::stop() {
this->m_cancelled = ::time(0);
if ( this->m_current.buffer != nullptr && this->m_current.pointer > this->m_current.buffer ) {
lock_guard<mutex> bufferLock(this->m_bufferLock);
std::lock_guard<std::mutex> bufferLock(this->m_bufferLock);
auto tmp = this->m_current;
this->m_current = {nullptr, nullptr};
this->m_todo.push_back(tmp);
......@@ -317,7 +316,7 @@ int StorageWriter::cancel() {
/// Finalize the MBM server
int StorageWriter::finalize() {
if ( this->m_current.buffer != nullptr && this->m_current.pointer > this->m_current.buffer ) {
lock_guard<mutex> bufferLock(this->m_bufferLock);
std::lock_guard<std::mutex> bufferLock(this->m_bufferLock);
auto tmp = this->m_current;
this->m_current = {nullptr, nullptr};
this->m_todo.push_back(tmp);
......@@ -341,7 +340,7 @@ int StorageWriter::finalize() {
StorageWriter::Buffer& StorageWriter::get_buffer() {
if ( nullptr == this->m_current.buffer ) {
while( !this->m_shutdown ) {{
lock_guard<mutex> bufferLock(this->m_bufferLock);
std::lock_guard<std::mutex> bufferLock(this->m_bufferLock);
if ( !this->m_free.empty() ) {
this->m_current = m_free.back();
this->m_free.pop_back();
......@@ -361,7 +360,7 @@ StorageWriter::Buffer& StorageWriter::get_buffer() {
/// Flush existing file buffer to disk and close file
void StorageWriter::flush_buffer(Buffer& buff) {
if ( buff.buffer != nullptr ) {
lock_guard<mutex> bufferLock(this->m_bufferLock);
std::lock_guard<std::mutex> bufferLock(this->m_bufferLock);
this->m_todo.emplace_back(buff);
buff = {nullptr, nullptr};
}
......@@ -427,10 +426,10 @@ int StorageWriter::save_pcie40_as_mdf(Buffer& buff, const uint8_t* start, int64_
decoder.decode(ev, mep);
/// Save the events one by one to the buffer
for( auto* e=ev->begin(); e != ev->end(); e=ev->next(e) ) {
uint32_t hdrvsn = 3;
size_t hdrlen = EventHeader::sizeOf(hdrvsn);
size_t length = e->total_length();
int sc = ensure_buffer(buff, length + hdrlen);
uint32_t hdrvsn = 3;
std::size_t hdrlen = EventHeader::sizeOf(hdrvsn);
std::size_t length = e->total_length();
int sc = ensure_buffer(buff, length + hdrlen);
if ( sc == DF_SUCCESS ) {
if ( 0 == m_curr_run ) {
const auto* odin = e->bank_collection(0)->at(0);
......@@ -456,7 +455,7 @@ int StorageWriter::save_pcie40_as_mdf(Buffer& buff, const uint8_t* start, int64_
h.H1->setOrbitNumber(m_curr_orbit);
h.H1->setBunchID(m_curr_bunch);
buff.pointer += hdrlen;
for( size_t i=0, n=e->num_bank_collections(); i<n; ++i) {
for( std::size_t i=0, n=e->num_bank_collections(); i<n; ++i) {
buff.pointer = (uint8_t*)e->bank_collection(i)->copy_data(buff.pointer);
}
if ( buff.pointer-b_beg != hdr->recordSize() ) {
......@@ -535,9 +534,9 @@ int StorageWriter::execute(const Context::EventData& event) {
}
else if ( this->m_enableWriting ) {
try {
size_t len = event.length;
auto* start = (uint8_t*)event.data;