diff --git a/Online/Dataflow/src/Storage/StorageWriter.cpp b/Online/Dataflow/src/Storage/StorageWriter.cpp index a0595f8e3a63542a2b7cba4d1f5d58a41f4e0fd5..fc78e99b9546f5dbe0268e8803cec003397d1699 100644 --- a/Online/Dataflow/src/Storage/StorageWriter.cpp +++ b/Online/Dataflow/src/Storage/StorageWriter.cpp @@ -204,7 +204,7 @@ StorageWriter::StorageWriter(const std::string& nam, Context& ctxt) std::string fname = "/${PARTITION}/${RUN1000}/Run_${RUN}_${NODE}_${TIME}_${PID}_${SEQ}.mdf"; this->declareProperty("Server", m_server); this->declareProperty("FDBVersion", m_fdb_version = 0); - this->declareProperty("BufferSizeMB", m_bufferSize = 1024); + this->declareProperty("BufferSizeMB", m_buffer_size = 1024); this->declareProperty("WriteErrorRetry", m_write_error_retry = 10); this->declareProperty("WriteErrorSleep", m_write_error_sleep = 2000); this->declareProperty("PollTimeout", m_poll_tmo = 1000); @@ -212,19 +212,21 @@ StorageWriter::StorageWriter(const std::string& nam, Context& ctxt) this->declareProperty("CancelTimeout", m_cancel_tmo = 100); this->declareProperty("NumBuffers", m_num_buffers = 2); this->declareProperty("NumThreads", m_num_threads = 1); - this->declareProperty("MinFileSizeMB", m_minFileSizeMB = 5); - this->declareProperty("MaxFileSizeMB", m_maxFileSizeMB = 5000); - this->declareProperty("DebugClient", m_debugClient = 0); - this->declareProperty("OutputType", m_outputType = "network"); - this->declareProperty("HaveFileDB", m_haveFileDB = 0); - this->declareProperty("EnableWriting", m_enableWriting = 1); - this->declareProperty("VerifyNFS", m_verifyNFS = 1); + this->declareProperty("MinFileSizeMB", m_min_file_size_MB = 5); + this->declareProperty("MaxFileSizeMB", m_max_file_size_MB = 5000); + this->declareProperty("DebugClient", m_debug_client = 0); + this->declareProperty("OutputType", m_output_type = "network"); + this->declareProperty("HaveFileDB", m_have_file_db = 0); + this->declareProperty("EnableWriting", m_enable_writing = 1); + this->declareProperty("VerifyNFS", m_verify_nfs = 1); + this->declareProperty("MEP2MDF", m_mep2mdf = 1); + this->declareProperty("MaxEvents", m_max_events = std::numeric_limits::max()); this->declareProperty("Stream", m_stream = "RAW"); - this->declareProperty("RunType", m_runType = ""); - this->declareProperty("RunList", m_runList); - this->declareProperty("PartitionName", m_partitionName = "LHCb"); - this->declareProperty("FileName", m_fileName = fname); + this->declareProperty("RunType", m_run_type = ""); + this->declareProperty("RunList", m_run_list); + this->declareProperty("PartitionName", m_partition_name = "LHCb"); + this->declareProperty("FileName", m_file_name = fname); this->declareProperty("ThreadFileQueues", m_threadFileQueues = false); storage::error_check_enable(false); } @@ -238,32 +240,32 @@ int StorageWriter::initialize() { int sc = this->Component::initialize(); if ( sc == DF_SUCCESS ) { storage::uri_t url(this->m_server); - this->m_bufferSize *= MBYTE; + this->m_buffer_size *= MBYTE; this->m_cancelled = 0; this->m_curr_run = 0; - if ( this->m_fileName.find(":") != std::string::npos ) - this->m_output_type = ROOT_STORAGE, m_outputType = "ROOT"; - else if ( ::strcasecmp(this->m_outputType.c_str(), "NFS") == 0 ) - this->m_output_type = POSIX_STORAGE; - else if ( ::strcasecmp(this->m_outputType.c_str(), "ROOT") == 0 ) - this->m_output_type = ROOT_STORAGE; - else if ( ::strcasecmp(this->m_outputType.c_str(), "POSIX") == 0 ) - this->m_output_type = POSIX_STORAGE; - else if ( ::strcasecmp(this->m_outputType.c_str(), "NETWORK") == 0 ) - this->m_output_type = NETWORK_STORAGE; + if ( this->m_file_name.find(":") != std::string::npos ) + this->m_output_type_id = ROOT_STORAGE, m_output_type = "ROOT"; + else if ( ::strcasecmp(this->m_output_type.c_str(), "NFS") == 0 ) + this->m_output_type_id = POSIX_STORAGE; + else if ( ::strcasecmp(this->m_output_type.c_str(), "ROOT") == 0 ) + this->m_output_type_id = ROOT_STORAGE; + else if ( ::strcasecmp(this->m_output_type.c_str(), "POSIX") == 0 ) + this->m_output_type_id = POSIX_STORAGE; + else if ( ::strcasecmp(this->m_output_type.c_str(), "NETWORK") == 0 ) + this->m_output_type_id = NETWORK_STORAGE; for(Buffer& b : this->m_free) ::free(b.buffer); this->m_free.clear(); for(std::size_t i=0; im_num_buffers; ++i) { Buffer b; - b.buffer = (uint8_t*)::malloc(m_bufferSize+1024); + b.buffer = (uint8_t*)::malloc(m_buffer_size+1024); b.pointer = b.buffer; this->m_free.push_back(b); } - this->declareMonitor("Events","OUT", m_eventsOUT=0, "Number of events processed"); - this->declareMonitor("Events","DROPPED", m_eventsDROP=0, "Number of events processed"); + this->declareMonitor("Events","OUT", m_events_OUT=0, "Number of events processed"); + this->declareMonitor("Events","DROPPED", m_events_DROP=0, "Number of events processed"); this->declareMonitor("Bursts","OUT", m_burstsOUT=0, "Number of bursts processed"); this->declareMonitor("FilesOpen", m_filesOpen=0, "Number of output files currently open"); this->declareMonitor("FilesOpened", m_filesOpened=0, "Number of output files opened"); @@ -302,7 +304,7 @@ int StorageWriter::start() { int StorageWriter::stop() { this->m_cancelled = ::time(0); if ( !this->m_active.empty() ) { - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); while ( !this->m_active.empty() ) { auto buff = std::move(m_active.begin()->second); m_active.erase(m_active.begin()); @@ -325,7 +327,7 @@ int StorageWriter::cancel() { /// Finalize the MBM server int StorageWriter::finalize() { if ( !this->m_active.empty() ) { - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); while ( !this->m_active.empty() ) { auto buff = std::move(m_active.begin()->second); m_active.erase(m_active.begin()); @@ -353,7 +355,7 @@ int StorageWriter::finalize() { int StorageWriter::decode_run_list() { int status = DF_SUCCESS; this->m_run_partitions.clear(); - for( const auto& run_part : this->m_runList ) { + for( const auto& run_part : this->m_run_list ) { int run_num = 0; auto items = RTL::str_split(run_part,"/"); if ( items.size() == 2 && 1 == ::sscanf(items[1].c_str(),"%010d",&run_num) ) { @@ -365,7 +367,7 @@ int StorageWriter::decode_run_list() { // If after a STOP_RUN new runs are selected this will inevitably lead to an inconsistency! // This only fixes the problem if runs from the SAME partition are selected! // - this->m_partitionName = part; + this->m_partition_name = part; continue; } this->error("Invalid run-list encountered. Failed to decode. Item: %s", run_part.c_str()); @@ -379,22 +381,28 @@ StorageWriter::Buffer& StorageWriter::get_buffer(uint32_t run, int64_t length) static Buffer empty; std::time_t now = ::time(0); { - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); if ( run > this->m_curr_run ) { this->m_curr_run = run; } + // Check for outdated buffers from previous runs for(auto ib = this->m_active.begin(); ib != this->m_active.end(); ++ib ) { - if ( (*ib).first < this->m_curr_run && (m_active.size() > 2 || (now- (*ib).second.lastWrite) > 120) ) { - this->m_todo.emplace_back((*ib).second); + const auto& buff = (*ib).second; + if ( (*ib).first < this->m_curr_run && (m_active.size() > 2 || (now- buff.last_write) > 120) ) { + this->m_todo.emplace_back(buff); + this->m_active.erase(ib); + } + if ( buff.num_events >= this->m_max_events ) { + this->m_todo.emplace_back(buff); this->m_active.erase(ib); - break; // Should be enough to move them one-by-one } } + // Check if there are already active buffers availible with sufficient space for(auto ib = this->m_active.begin(); ib != this->m_active.end(); ++ib ) { if ( (*ib).first == run ) { auto& buff = (*ib).second; if ( (buff.pointer - buff.buffer) + length < this->maxBufferSize() ) { - buff.lastWrite = now; + buff.last_write = now; return buff; } this->m_todo.emplace_back(buff); @@ -404,15 +412,16 @@ StorageWriter::Buffer& StorageWriter::get_buffer(uint32_t run, int64_t length) } } while( !this->m_shutdown ) {{ - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); if ( !this->m_free.empty() ) { this->m_active.emplace(run, m_free.back()); this->m_free.pop_back(); for(auto& b : this->m_active) { if ( b.first == run ) { b.second.pointer = b.second.buffer; - b.second.lastWrite = ::time(0); - b.second.runNumber = run; + b.second.last_write = ::time(0); + b.second.run_number = run; + b.second.num_events = 0; return b.second; } } @@ -423,9 +432,9 @@ StorageWriter::Buffer& StorageWriter::get_buffer(uint32_t run, int64_t length) } { now = ::time(0); - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); for(auto ib = this->m_active.begin(); ib != this->m_active.end(); ++ib ) { - if ( (*ib).first < this->m_curr_run && (m_active.size() > 2 || (now- (*ib).second.lastWrite) > 120) ) { + if ( (*ib).first < this->m_curr_run && (m_active.size() > 2 || (now- (*ib).second.last_write) > 120) ) { this->m_todo.emplace_back((*ib).second); this->m_active.erase(ib); break; // Should be enough to move them one-by-one @@ -442,7 +451,7 @@ int StorageWriter::save_buffer(uint32_t run, const void* data, int64_t length) if ( this->maxBufferSize() < length ) { this->error("Execute: Failed to allocate buffer [%ld bytes]. " "BUFFER SIZE TOO SMALL! Change options!", length); - ++this->m_eventsDROP; + ++this->m_events_DROP; return DF_ERROR; } @@ -450,18 +459,19 @@ int StorageWriter::save_buffer(uint32_t run, const void* data, int64_t length) if ( this->m_shutdown && buff.buffer == nullptr ) { info("Execute: Failed to allocate buffer. Drop event. [Shutdown requested]"); - ++this->m_eventsDROP; + ++this->m_events_DROP; return DF_SUCCESS; } if ( buff.buffer == nullptr ) { info("Execute: Failed to allocate buffer. Drop event. [Internal error ?????]"); - ++this->m_eventsDROP; + ++this->m_events_DROP; return DF_CONTINUE; } - this->m_last_event_stamp = buff.lastWrite; + this->m_last_event_stamp = buff.last_write; ::memcpy(buff.pointer, data, length); buff.pointer += length; - ++this->m_eventsOUT; + ++buff.num_events; + ++this->m_events_OUT; return DF_SUCCESS; } @@ -487,20 +497,20 @@ int StorageWriter::save_pcie40_as_mdf(const uint8_t* start, int64_t len) { const auto* odin = e->bank_collection(0)->at(0); if ( odin && odin->data() ) { - const auto* sodin = odin->begin(); + const auto* sodin = odin->begin(); int32_t curr_run = sodin->run_number(); int32_t curr_orbit = sodin->orbit_id(); int32_t curr_bunch = sodin->bunch_id(); - Buffer& buff = get_buffer(curr_run, length + hdrlen); + Buffer& buff = this->get_buffer(curr_run, length + hdrlen); if ( this->m_shutdown && buff.buffer == nullptr ) { info("Execute: Failed to allocate buffer. Drop event. [Shutdown requested]"); - ++this->m_eventsDROP; + ++this->m_events_DROP; return DF_SUCCESS; } if (buff.buffer == nullptr ) { info("Execute: Failed to allocate buffer. Drop event. [Internal error ?????]"); - ++this->m_eventsDROP; + ++this->m_events_DROP; return DF_CONTINUE; } @@ -526,13 +536,14 @@ int StorageWriter::save_pcie40_as_mdf(const uint8_t* start, int64_t len) { this->error("++ Event length inconsistency: %ld <> %ld %ld %ld", buff.pointer-b_beg, hdr->recordSize(), length, hdrlen); } - ++this->m_eventsOUT; + ++buff.num_events; + ++this->m_events_OUT; continue; } - ++this->m_eventsDROP; + ++this->m_events_DROP; } /// Move to the next MEP if any - if ( m=mep->next(); m > mep ) { + if ( m = mep->next(); m > mep ) { mep = m; continue; } @@ -542,6 +553,61 @@ int StorageWriter::save_pcie40_as_mdf(const uint8_t* start, int64_t len) { return nev > 0 ? DF_SUCCESS : DF_ERROR; } +/// Save PCIE40 MEP as is +int StorageWriter::save_pcie40_as_mep(const uint8_t* start, int64_t len) { + auto* mep_start = (pcie40::mep_header_t*)start; + auto* mep_end = pcie40::add_ptr(start, len); + + /// For each MEP in the frame: Search for the odin bank and dump the MEP frame + for(const pcie40::mep_header_t *m, *mep = mep_start; mep < mep_end; ) { + if ( !mep->is_valid() ) { + /// If we do not have a MEP header: stop processing + ++this->m_badHeader; + return DF_SUCCESS; + } + std::size_t num_fragments = mep->num_source; + for( uint32_t i = 0; i < num_fragments; ++i ) { + const pcie40::multi_fragment_t *mfp = mep->multi_fragment(i); + const pcie40::frontend_data_t *curr = mfp->data(); + const std::size_t align = mfp->header.alignment; + const uint8_t *typs = mfp->types(); + const uint16_t *lens = mfp->sizes(); + + for (std::size_t cnt=0, n=mfp->packingFactor(); cntrun_number(); + Buffer& buff = this->get_buffer(run, len); + if ( this->m_shutdown && buff.buffer == nullptr ) { + this->info("Execute: Failed to allocate buffer. Drop event. [Shutdown requested]"); + ++this->m_events_DROP; + return DF_SUCCESS; + } + if (buff.buffer == nullptr ) { + this->info("Execute: Failed to allocate buffer. Drop event. [Internal error ?????]"); + ++this->m_events_DROP; + return DF_CONTINUE; + } + int sc = this->save_buffer(run, mep, length); + if ( DF_SUCCESS != sc ) { + return DF_SUCCESS; + } + break; + } + curr = curr->next(length, align); + } + } + /// Move to the next MEP if any + if ( m = mep->next(); m > mep ) { + mep = m; + continue; + } + } + return DF_SUCCESS; +} + /// Save MDF frame or MDF burst int StorageWriter::save_mdf_buffer(const uint8_t* start, int64_t len) { long num_bad_headers = 0; @@ -561,7 +627,7 @@ int StorageWriter::save_mdf_buffer(const uint8_t* start, int64_t len) { start += length; ++num_bad_headers; ++this->m_badHeader; - ++this->m_eventsDROP; + ++this->m_events_DROP; continue; } if ( run > this->m_curr_run ) { @@ -588,9 +654,9 @@ int StorageWriter::execute(const Context::EventData& event) { /// Extend idle time if there are still events coming if ( this->m_cancelled > 0 ) { this->m_cancelled = ::time(0); - ++this->m_eventsDROP; + ++this->m_events_DROP; } - else if ( this->m_enableWriting ) { + else if ( this->m_enable_writing ) { try { std::size_t len = event.length; auto* start = (uint8_t*)event.data; @@ -603,9 +669,16 @@ int StorageWriter::execute(const Context::EventData& event) { /// Auto detect data type: first check for PCIE40 MEP format auto* mep_hdr = (pcie40::mep_header_t*)start; if( mep_hdr->is_valid() ) { - status = this->save_pcie40_as_mdf(start, int64_t(mep_hdr->size*sizeof(uint32_t))); - if ( status == DF_SUCCESS ) ++this->m_burstsOUT; - return status; + if ( this->m_mep2mdf ) { + status = this->save_pcie40_as_mdf(start, int64_t(mep_hdr->size*sizeof(uint32_t))); + if ( status == DF_SUCCESS ) ++this->m_burstsOUT; + return status; + } + else { + status = this->save_pcie40_as_mep(start, int64_t(mep_hdr->size*sizeof(uint32_t))); + if ( status == DF_SUCCESS ) ++this->m_burstsOUT; + return status; + } } /// Auto detect data type: first check for MDF/BURST format @@ -626,7 +699,7 @@ int StorageWriter::execute(const Context::EventData& event) { } } else { - ++this->m_eventsDROP; + ++this->m_events_DROP; } return status; } @@ -635,12 +708,12 @@ int StorageWriter::execute(const Context::EventData& event) { std::string StorageWriter::makeFileName(int run) { char text[128]; struct timeval tv; - std::string file_name = this->m_fileName; - std::string part = this->m_partitionName; + std::string file_name = this->m_file_name; + std::string part = this->m_partition_name; ::gettimeofday(&tv, nullptr); struct tm *timeinfo = ::localtime(&tv.tv_sec); - ++this->m_sequenceNumber; + ++this->m_sequence_number; if ( !this->m_run_partitions.empty() ) { auto irun = this->m_run_partitions.find(run); if ( irun != this->m_run_partitions.end() ) @@ -651,13 +724,13 @@ std::string StorageWriter::makeFileName(int run) { file_name = RTL::str_replace(file_name, "${TIME}", text); file_name = RTL::str_replace(file_name, "${NODE}", RTL::nodeNameShort()); file_name = RTL::str_replace(file_name, "${STREAM}", this->m_stream); - file_name = RTL::str_replace(file_name, "${RUNTYPE}", this->m_runType); + file_name = RTL::str_replace(file_name, "${RUNTYPE}", this->m_run_type); file_name = RTL::str_replace(file_name, "${PARTITION}", part); ::snprintf(text, sizeof(text), "%010d", int((run/1000)*1000)); file_name = RTL::str_replace(file_name, "${RUN1000}", text); ::snprintf(text, sizeof(text), "%010d", run); file_name = RTL::str_replace(file_name, "${RUN}", text); - ::snprintf(text, sizeof(text), "%04ld", m_sequenceNumber); + ::snprintf(text, sizeof(text), "%04ld", m_sequence_number); file_name = RTL::str_replace(file_name, "${SEQ}", text); ::snprintf(text, sizeof(text), "%06d", ::lib_rtl_pid()); file_name = RTL::str_replace(file_name, "${PID}", text); @@ -668,7 +741,7 @@ std::string StorageWriter::makeFileName(int run) { template int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::map >& open_files) { auto add_db_entry = [this] (size_t len, const std::string& fname) { - if ( this->m_haveFileDB ) { + if ( this->m_have_file_db ) { http::HttpReply reply; try { std::string url; @@ -676,8 +749,8 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::mapm_server); storage::client::reqheaders_t hdrs; cl.fdbclient = - storage::client::create(srv.host, srv.port, 10000, this->m_debugClient); - if ( this->m_haveFileDB == 2 ) { + storage::client::create(srv.host, srv.port, 10000, this->m_debug_client); + if ( this->m_have_file_db == 2 ) { hdrs.emplace_back(http::constants::location, fname); } reply = cl.save_object_record(fname, url, len, hdrs); @@ -732,7 +805,7 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::map bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); if ( this->m_free.size() == this->m_num_buffers && this->m_shutdown ) { std::lock_guard outputLock(queue_lock); /// Shutdown (reset) requested. Close output and register file. If not possible -- nothing we can do! @@ -776,10 +849,10 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::maplength > 0 && output->length + len > m_maxFileSizeMB * MBYTE ) + if ( output.get() && output->length > 0 && output->length + len > m_max_file_size_MB * MBYTE ) close_output(output, "size-limit", 1); if ( !output.get() ) { @@ -789,14 +862,14 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::map file_lock(output->lock); now = ::time(0); if ( !output->isOpen() ) { - std::string fname = this->makeFileName(b.runNumber); - if ( output->open(fname, this->m_verifyNFS) <= 0 ) { + std::string fname = this->makeFileName(b.run_number); + if ( output->open(fname, this->m_verify_nfs) <= 0 ) { this->error("Failed to open output %s. [%s]", fname.c_str(), RTL::errorString(errno).c_str()); this->fireIncident("DAQ_ERROR"); return DF_ERROR; } this->warning("Opened %s", output->name()); - output->run = b.runNumber; + output->run = b.run_number; ++this->m_filesOpened; ++this->m_filesOpen; } @@ -806,7 +879,7 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::maplast_write = now; output->length += len; - std::lock_guard counterLock(this->m_counterLock); + std::lock_guard counterLock(this->m_counter_lock); this->m_bytesOut += len; } else { @@ -814,14 +887,14 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::maplock.unlock(); close_output(output, "write-failed", 1); - std::lock_guard counterLock(this->m_counterLock); + std::lock_guard counterLock(this->m_counter_lock); this->m_bytesDropped += len; ++this->m_writeErrors; //this->fireIncident("DAQ_ERROR"); //return DF_ERROR; } { - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); b.pointer = b.buffer; this->m_free.push_back(b); } @@ -840,7 +913,7 @@ int StorageWriter::process_posix_buffers(std::mutex& queue_lock, std::map bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); if ( this->m_free.size() == this->m_num_buffers && this->m_shutdown ) { this->info("process: Exit condition detected. Leaving submit thread."); return DF_SUCCESS; @@ -853,10 +926,10 @@ int StorageWriter::process_network_buffers() { if ( b.buffer ) { long len = b.pointer - b.buffer; /// Skip files with less than a minimum bytes - if ( len > this->m_minFileSizeMB*MBYTE ) { - if ( this->writeBuffer(b) != DF_SUCCESS ) { + if ( len > this->m_min_file_size_MB*MBYTE ) { + if ( this->write_buffer(b) != DF_SUCCESS ) { /// Set the writer into error state and inhibit all further consumption of events - std::lock_guard counterLock(this->m_counterLock); + std::lock_guard counterLock(this->m_counter_lock); this->m_bytesDropped += len; ++this->m_writeErrors; this->fireIncident("DAQ_ERROR"); @@ -864,16 +937,16 @@ int StorageWriter::process_network_buffers() { std::error_condition(errno,std::system_category()).message().c_str()); return DF_ERROR; } - std::lock_guard counterLock(this->m_counterLock); + std::lock_guard counterLock(this->m_counter_lock); this->m_bytesOut += len; } else { this->info("Skip mini-file with only %ld bytes",int64_t(len)); - std::lock_guard counterLock(this->m_counterLock); + std::lock_guard counterLock(this->m_counter_lock); this->m_bytesDropped += len; } { - std::lock_guard bufferLock(this->m_bufferLock); + std::lock_guard bufferLock(this->m_buffer_lock); b.pointer = b.buffer; this->m_free.push_back(b); } @@ -888,7 +961,7 @@ int StorageWriter::process_network_buffers() { /// Thread invocation routine to save assembled buffers to the disk server int StorageWriter::process_buffers() { - switch(this->m_output_type) { + switch(this->m_output_type_id) { case NETWORK_STORAGE: return this->process_network_buffers(); case ROOT_STORAGE: @@ -897,7 +970,7 @@ int StorageWriter::process_buffers() { std::map > files; return this->process_posix_buffers(queue_lock, files); } - return this->process_posix_buffers(this->m_outputLock,this->m_open_root_files); + return this->process_posix_buffers(this->m_output_lock,this->m_open_root_files); case POSIX_STORAGE: default: if ( this->m_threadFileQueues ) { @@ -905,7 +978,7 @@ int StorageWriter::process_buffers() { std::map > files; return this->process_posix_buffers(queue_lock, files); } - return this->process_posix_buffers(this->m_outputLock,this->m_open_posix_files); + return this->process_posix_buffers(this->m_output_lock,this->m_open_posix_files); } } @@ -917,7 +990,7 @@ void StorageWriter::print_reply(const char* prefix, const http::HttpReply& reply reply.print(str,""); do { getline(str, line); - error("writeBuffer-reply<%s>: %s", prefix, line.c_str()); + error("write_buffer-reply<%s>: %s", prefix, line.c_str()); } while( str.good() && !str.eof() ); #endif switch(reply.status) { @@ -925,7 +998,7 @@ void StorageWriter::print_reply(const char* prefix, const http::HttpReply& reply case http::HttpReply::accepted: case http::HttpReply::permanent_redirect: line = http::HttpReply::stock_status(reply.status); - this->info("writeBuffer-reply<%s>: %s", prefix, line.c_str()); + this->info("write_buffer-reply<%s>: %s", prefix, line.c_str()); break; case http::HttpReply::internal_server_error: @@ -939,14 +1012,14 @@ void StorageWriter::print_reply(const char* prefix, const http::HttpReply& reply case http::HttpReply::no_content: default: line = http::HttpReply::stock_status(reply.status); - this->error("writeBuffer-reply<%s>: %s", prefix, line.c_str()); + this->error("write_buffer-reply<%s>: %s", prefix, line.c_str()); break; } } /// Write multi event buffer to file. Eventually open a new file.... -int StorageWriter::writeBuffer(const Buffer& buff) { - std::string fname = this->makeFileName(buff.runNumber); +int StorageWriter::write_buffer(const Buffer& buff) { + std::string fname = this->makeFileName(buff.run_number); int num_retries = this->m_write_error_retry; std::size_t len = buff.pointer - buff.buffer; bool process = (this->m_cancelled == 0) || (::time(0) - this->m_cancelled < this->m_cancel_tmo); @@ -959,7 +1032,7 @@ int StorageWriter::writeBuffer(const Buffer& buff) { client_t cl(this->m_fdb_version); storage::uri_t srv(this->m_server); cl.fdbclient = - storage::client::create(srv.host, srv.port, 10000, this->m_debugClient); + storage::client::create(srv.host, srv.port, 10000, this->m_debug_client); reply = cl.save_object_record(fname, url, len); @@ -976,18 +1049,18 @@ int StorageWriter::writeBuffer(const Buffer& buff) { double kB = double(len)/1024e0; void (Component::*prt)(const char*,...) const = &Component::info; if ( this->m_cancelled > 0 ) prt = &Component::warning; - ((*this).*prt)("writeBuffer: Saved '%s' %.2f %cB", url.c_str(), + ((*this).*prt)("write_buffer: Saved '%s' %.2f %cB", url.c_str(), kB>1e4 ? kB/1024e0 : kB, kB>1e4 ? 'M' : 'k'); ++this->m_filesClosed; return DF_SUCCESS; } } catch(const std::exception& e) { - this->error("writeBuffer: Exception while sending data: %s",e.what()); + this->error("write_buffer: Exception while sending data: %s",e.what()); reply.status = http::HttpReply::bad_gateway; } catch(...) { - this->error("writeBuffer: Exception while sending data: UNKNOWN Exception"); + this->error("write_buffer: Exception while sending data: UNKNOWN Exception"); reply.status = http::HttpReply::bad_gateway; } switch(reply.status) { @@ -1026,11 +1099,11 @@ int StorageWriter::writeBuffer(const Buffer& buff) { } } catch(const std::exception& e) { - this->error("writeBuffer: Exception while connecting: %s",e.what()); + this->error("write_buffer: Exception while connecting: %s",e.what()); reply.status = http::HttpReply::bad_gateway; } catch(...) { - this->error("writeBuffer: Exception while connecting: UNKNOWN Exception"); + this->error("write_buffer: Exception while connecting: UNKNOWN Exception"); reply.status = http::HttpReply::bad_gateway; } diff --git a/Online/Dataflow/src/Storage/StorageWriter.h b/Online/Dataflow/src/Storage/StorageWriter.h index aae65ebdf5396b116543e3488f63804afc0188e9..a9b537520edc5908da1216aef6a56a06d03b0aff 100644 --- a/Online/Dataflow/src/Storage/StorageWriter.h +++ b/Online/Dataflow/src/Storage/StorageWriter.h @@ -24,6 +24,7 @@ /// C/C++ include files #include #include +#include #include #include #include @@ -52,13 +53,15 @@ namespace Online { struct Buffer { /// Pointer to the start of the event buffer - uint8_t* buffer = nullptr; + uint8_t* buffer { nullptr }; /// Current pointer inside the event buffer - uint8_t* pointer = nullptr; + uint8_t* pointer { nullptr }; /// Associated run number - uint32_t runNumber {0}; + uint32_t run_number { 0 }; + /// Number of accumulated events + uint32_t num_events { 0 }; /// Last access time - std::time_t lastWrite {0}; + std::time_t last_write { 0 }; /// Default constructor Buffer() = default; }; @@ -72,100 +75,104 @@ namespace Online { /// Property: FDB server name std::string m_server; /// Property: Partition name string - std::string m_partitionName; + std::string m_partition_name { }; /// Property: Run-type string - std::string m_runType; + std::string m_run_type { }; /// Property: Stream identifier - std::string m_stream; + std::string m_stream { }; /// File Name Pattern - std::string m_fileName; + std::string m_file_name { }; /// Steer disk type processing (nfs, network, ...) - std::string m_outputType; + std::string m_output_type { }; /// Property: Runlist to determine the input partition by run number - std::vector m_runList; + std::vector m_run_list { }; /// Property: Buffer size - int64_t m_bufferSize; + int64_t m_buffer_size; /// Property: Number of parallel event buffers - std::size_t m_num_buffers {0}; + std::size_t m_num_buffers { 0 }; /// Property: Number of event sender threads - std::size_t m_num_threads {0}; + std::size_t m_num_threads { 0 }; /// Property: Minimal file size in MBytes - long m_minFileSizeMB {0}; + long m_min_file_size_MB { 0 }; /// Property: Maximal file size in MBytes (used if writing to posix file system) - long m_maxFileSizeMB {512}; + long m_max_file_size_MB { 512 }; /// Property: Sleep in milliseconds between retries when write connection fails - int m_write_error_sleep {0}; + int m_write_error_sleep { 0 }; /// Property: Number of retries when write connection fails - int m_write_error_retry {0}; + int m_write_error_retry { 0 }; /// Property: Poll timeout to detect transfer buffers [microseconds] - int m_poll_tmo {100}; + int m_poll_tmo { 100 }; /// Property: Cancel timeout to empty pending buffers [miulli-seconds] - int m_cancel_tmo {100}; + int m_cancel_tmo { 100 }; /// Property: Idle timeout to close potentially open files [seconds] - int m_idle_tmo {20}; + int m_idle_tmo { 20 }; /// Property: Use FDB client when writing POSIX/ROOT - int m_haveFileDB {0}; + int m_have_file_db { 0 }; /// Property: Debug FDB client - int m_debugClient {0}; + int m_debug_client { 0 }; /// Property: Have threaded file queues - int m_fdb_version {0}; + int m_fdb_version { 0 }; /// Property: Enable/disable output to file - int m_enableWriting {1}; + int m_enable_writing { 1 }; /// Property: Verify that the output directory is NFS/CEPH mounted - int m_verifyNFS {1}; + int m_verify_nfs { 1 }; + /// Property: On occurrence of a MEP: convert it automatically to MDF format + int m_mep2mdf { 1 }; + /// Property: Maximum events allowed per file + uint32_t m_max_events { std::numeric_limits::max() }; /// Property: Have threaded file queues - bool m_threadFileQueues {false}; + bool m_threadFileQueues { false }; /// Monitoring quantity: Number of events written to output - long m_eventsOUT {0}; + long m_events_OUT { 0 }; /// Monitoring quantity: Number of events not written and dropped - long m_eventsDROP {0}; + long m_events_DROP { 0 }; /// Monitoring quantity: Number of bursts submitted to output - long m_burstsOUT {0}; + long m_burstsOUT { 0 }; /// Monitoring quantity: Number of files currently open to write output - long m_filesOpen {0}; + long m_filesOpen { 0 }; /// Monitoring quantity: Number of files opened to write output - long m_filesOpened {0}; + long m_filesOpened { 0 }; /// Monitoring quantity: Number of files closed to write output - long m_filesClosed {0}; + long m_filesClosed { 0 }; /// Monitoring quantity: Number of writte errors - long m_writeErrors {0}; + long m_writeErrors { 0 }; /// Monitoring quantity: Number of bytes written to output - long m_bytesOut {0}; + long m_bytesOut { 0 }; /// Monitoring quantity: Number of bytes dropped from output - long m_bytesDropped {0}; + long m_bytesDropped { 0 }; /// Monitoring quantity: Number of events with a bad header structure - long m_badHeader {0}; + long m_badHeader { 0 }; /// Buffer handling thread std::vector m_threads; /// Mutex to lock the event buffer queues when filling/saving - std::mutex m_bufferLock; + std::mutex m_buffer_lock; /// Mutex to lock the output queues when filling/saving - std::mutex m_outputLock; + std::mutex m_output_lock; /// Mutex to lock counters - std::mutex m_counterLock; + std::mutex m_counter_lock; /// Decoded runlist for proper partition names std::map m_run_partitions; /// Free buffers to be filled when writing - std::vector m_free; + std::vector m_free { }; /// List of filled buffers to be dumped to storage device - std::vector m_todo; + std::vector m_todo { }; /// Active buffers being filled - std::map m_active; + std::map m_active { }; - uint32_t m_curr_run {0}; - std::size_t m_sequenceNumber {0}; - time_t m_last_event_stamp {0}; + uint32_t m_curr_run { 0 }; + std::size_t m_sequence_number { 0 }; + time_t m_last_event_stamp { 0 }; enum output_type_t { NETWORK_STORAGE = 1, POSIX_STORAGE = 2, ROOT_STORAGE = 3 }; /// Flag with preprocessed output type - output_type_t m_output_type { NETWORK_STORAGE }; + output_type_t m_output_type_id { NETWORK_STORAGE }; /// Flag to detect cancellation in the event processing thread - time_t m_cancelled {0}; + time_t m_cancelled { 0 }; /// Flag to indicate the ongoing shutdown process - bool m_shutdown {false}; + bool m_shutdown { false }; protected: /// Decode the run-list to determine proper file names for HLT2 @@ -186,13 +193,15 @@ namespace Online { int save_buffer(uint32_t run, const void* data, int64_t length); /// Convert PCIE40 MEP to MDF and save it. int save_pcie40_as_mdf(const uint8_t* start, int64_t length); + /// Save PCIE40 MEP as is + int save_pcie40_as_mep(const uint8_t* start, int64_t length); /// Save MDF frame or MDF burst int save_mdf_buffer(const uint8_t* start, int64_t length); /// Maximum available buffer size - int64_t maxBufferSize() const { return m_bufferSize; } + int64_t maxBufferSize() const { return this->m_buffer_size; } /// Write multi event buffer to file. Eventually open a new file.... - int writeBuffer(const Buffer& buffer); + int write_buffer(const Buffer& buffer); /// Print server's HttpReply structure void print_reply(const char* prefix, const http::HttpReply& reply) const; diff --git a/Online/Dataflow/src/components/MBMServer.cpp b/Online/Dataflow/src/components/MBMServer.cpp index e76daa619368d119c3466055929370747341a826..36adeb719c734e95dddaddd60911edde047bb51a 100644 --- a/Online/Dataflow/src/components/MBMServer.cpp +++ b/Online/Dataflow/src/components/MBMServer.cpp @@ -18,27 +18,27 @@ /// Framework includes #include "MBMServer.h" #include -#include #include +#include // C/C++ include files #include #include #include -using namespace std; using namespace Online; /// Initializing constructor MBMServer::MBMServer(const std::string& nam, Context& ctxt) - : Component(nam, ctxt), m_partitionID(0x103) + : Component(nam, ctxt), m_partition_id(0x103) { - this->m_procName = RTL::processName(); - this->declareProperty("PartitionID", this->m_partitionID); - this->declareProperty("PartitionName", this->m_partitionName=""); - this->declareProperty("InitFlags", this->m_initFlags); - this->declareProperty("PartitionBuffers", this->m_partitionBuffers=false); - this->declareProperty("ConsumerRequirements", this->m_consRequirements); + this->declareProperty("PartitionID", this->m_partition_id); + this->declareProperty("PartitionName", this->m_partition_name=""); + this->declareProperty("InitFlags", this->m_init_flags); + this->declareProperty("PartitionBuffers", this->m_partition_buffers=false); + this->declareProperty("ConsumerRequirements", this->m_consumer_requirements); + this->declareProperty("AccidentBuffers", this->m_accident_buffers); + this->declareProperty("AccidentOutput", this->m_accident_output); } /// Default destructor @@ -49,30 +49,29 @@ MBMServer::~MBMServer() { int MBMServer::initialize() { info("MBM Server: in initialize."); this->Component::initialize(); - this->m_procName = RTL::processName(); this->info("Initializing MBM Buffers."); - if ( !this->m_initFlags.empty() ) { - size_t ikey = 0; - char *items[64], part_id[16]; - string tmp = this->m_initFlags; - ::snprintf(part_id,sizeof(part_id),"%X",this->m_partitionID); - for(char* tok=::strtok((char*)tmp.c_str()," "); tok; tok=::strtok(NULL," ")) { - if ( this->m_partitionBuffers && ::toupper(tok[1]) == 'I' ) { - string bm_name = this->bufferName(tok); - items[ikey++] = strcpy(new char[bm_name.length()+1],bm_name.c_str()); + if ( !this->m_init_flags.empty() ) { + std::size_t ikey = 0; + char *items[64], part_id[16]; + std::string tmp = this->m_init_flags; + std::snprintf(part_id, sizeof(part_id), "%X", this->m_partition_id); + for(char* tok = ::strtok((char*)tmp.c_str()," "); tok; tok = ::strtok(NULL," ")) { + if ( this->m_partition_buffers && ::toupper(tok[1]) == 'I' ) { + std::string bm_name = this->bufferName(tok); + items[ikey++] = std::strcpy(new char[bm_name.length()+1],bm_name.c_str()); continue; } - items[ikey++] = strcpy(new char[strlen(tok)+1],tok); + items[ikey++] = std::strcpy(new char[std::strlen(tok)+1],tok); } - for(size_t i=0; im_srvBMIDs = ::mbm_multi_install(ikey, items); - for(size_t j=0; jm_server_bm = ::mbm_multi_install(ikey, items); + for(std::size_t j=0; jm_srvBMIDs.empty() ) { + if ( this->m_server_bm.empty() ) { return this->error("Failed to initialize MBM buffers."); } info("MBM Server: done initializing."); @@ -85,6 +84,7 @@ int MBMServer::initialize() { /// Start MBM server int MBMServer::start() { this->info("MBM Server: in start."); + this->subscribeAccidents(); int sc = this->Component::start(); this->context.manager.setRunNumber(100); this->info("MBM Server: ending start."); @@ -93,6 +93,7 @@ int MBMServer::start() { /// Stop MBM server int MBMServer::stop() { + this->unsubscribeAccidents(); this->unsubscribeIncidents(); this->undeclareMonitors(); return Component::stop(); @@ -100,31 +101,31 @@ int MBMServer::stop() { /// Finalize the MBM server int MBMServer::finalize() { - for(auto i=this->m_srvBMIDs.begin(); i!=this->m_srvBMIDs.end(); ++i) { - ServerBMID bmid = (*i).second; - ::mbmsrv_stop_dispatch(bmid); - } - for(auto i=this->m_srvBMIDs.begin(); i!=this->m_srvBMIDs.end(); ++i) { - ServerBMID bmid = (*i).second; - ::mbmsrv_unlink_memory(bmid); - ::mbmsrv_destroy(bmid); + /// Stop MBM servers + for(const auto& bm : this->m_server_bm) + ::mbmsrv_stop_dispatch(bm.second); + + /// Unlink from shared memory + for(const auto& bm : this->m_server_bm) { + ::mbmsrv_unlink_memory(bm.second); + ::mbmsrv_destroy(bm.second); } - this->m_srvBMIDs.clear(); + this->m_server_bm.clear(); return this->Component::finalize(); } /// Create proper buffer name depending on partitioning std::string MBMServer::bufferName(const std::string& nam) const { - string bm_name = nam; - if ( this->m_partitionBuffers ) { + std::string bm_name = nam; + if ( this->m_partition_buffers ) { bm_name += "_"; - if ( this->m_partitionName.empty() ) { + if ( this->m_partition_name.empty() ) { char part_id[16]; - ::snprintf(part_id,sizeof(part_id),"%X",this->m_partitionID); + ::snprintf(part_id,sizeof(part_id),"%X",this->m_partition_id); bm_name += part_id; } else { - bm_name += this->m_partitionName; + bm_name += this->m_partition_name; } } return bm_name; @@ -132,25 +133,25 @@ std::string MBMServer::bufferName(const std::string& nam) const { /// Apply consumer requirements to MBM buffers int MBMServer::setConsumerRequirements() { - const string& part = this->m_partitionName; - string node = RTL::str_upper(RTL::nodeNameShort()); - for(auto i=begin(this->m_consRequirements); i!=end(this->m_consRequirements);++i) { - string buf = this->bufferName((*i).first); - const auto& rq = (*i).second; - ServedBuffers::const_iterator j=this->m_srvBMIDs.find(buf); - if ( j == this->m_srvBMIDs.end() ) { + const auto& part = this->m_partition_name; + auto node = RTL::str_upper(RTL::nodeNameShort()); + for(const auto& requirement : this->m_consumer_requirements) { + auto buf = this->bufferName(requirement.first); + const auto& rq = requirement.second; + ServedBuffers::const_iterator j = this->m_server_bm.find(buf); + if ( j == this->m_server_bm.end() ) { this->warning("Buffer "+buf+" does not exist. Failed to set consumer requirement [Ignored]."); continue; } else if ( rq.size() < 2 ) { - return this->error("Insufficient information from job-option. Failed to set consumer requirement."); + return this->error("Insufficient information from job options. Failed to set consumer requirement."); } else if ( ((rq.size())%2) != 0 ) { - return this->error("Inconsistent job-option. Failed to set consumer requirement."); + return this->error("Inconsistent job options. Failed to set consumer requirement."); } - for(size_t k=0; k",part); req = RTL::str_replace(req,"",node); @@ -171,11 +172,108 @@ int MBMServer::setConsumerRequirement(ServerBMID srvBM, const MBM::Requirement& r) { int status = ::mbmsrv_require_consumer(srvBM, task.c_str(), - this->m_partitionID, + this->m_partition_id, r.evtype, r.trmask); - if ( status != MBM_NORMAL ) + if ( status != MBM_NORMAL ) { return this->error("Failed to set BM consumer requirements."); + } return DF_SUCCESS; } +namespace { + /// Static Accident handler + void server_accidents(void* param, ServerBMID bmid, long offset, std::size_t len, int typ, const unsigned int mask[]) { + void* address = nullptr; + MBMServer* srv = (MBMServer*)param; + ::mbmsrv_event_address(bmid, offset, &address); + srv->handleAccident(bmid, address, len, typ, mask); + } +} + +/// Subscribe to 'accident' events (client crashes etc.) +int MBMServer::subscribeAccidents() { + if ( !this->m_accident_buffers.empty() ) { + + /// Subscribe to accidents for the requested buffers + for( const auto& b : this->m_accident_buffers ) { + auto buf = this->bufferName(b); + auto j = this->m_server_bm.find(buf); + if ( j == this->m_server_bm.end() ) { + this->warning("Buffer "+buf+" does not exist. Failed to attach accident handler [ignored]."); + continue; + } + ::mbmsrv_subscribe_accidents((*j).second, this, server_accidents); + } + + /// Map output buffer to copy accident events + if ( !this->m_accident_output.empty() ) { + auto buf = this->bufferName(this->m_accident_output); + auto j = this->m_server_bm.find(buf); + if ( j == this->m_server_bm.end() ) { + this->warning("Accident output buffer "+buf+" does not exist. Cannot save failed events [ignored]."); + } + else { + int com = mbmsrv_communication_type((*j).second); + int pid = this->m_partition_id; + auto nam = RTL::processName(); + this->m_output_prod = std::make_unique(buf, nam, pid, com); + if ( this->m_output_prod->id() == MBM_INV_DESC ) { + this->warning("FAILED to map accident output buffer "+buf+" [ignored]."); + this->m_output_prod.reset(); + } + else { + this->warning("Successfully connected to accident output buffer "+buf+" [ignored]."); + } + } + } + } + return DF_SUCCESS; +} + +/// Unsubscribe from 'accident' events (client crashes etc.) +int MBMServer::unsubscribeAccidents() { + /// Unsubscribe from accidents of the requested buffers + for( const auto& b : this->m_accident_buffers ) { + auto buf = this->bufferName(b); + auto j = this->m_server_bm.find(buf); + if ( j != this->m_server_bm.end() ) { + ::mbmsrv_unsubscribe_accidents((*j).second); + } + } + /// Release producer for output buffer + if ( this->m_output_prod ) { + this->m_output_prod.reset(); + } + return DF_SUCCESS; +} + +/// Handle accident events +void MBMServer::handleAccident(ServerBMID bmid, const void* address, std::size_t len, int typ, const unsigned int mask[]) { + if ( bmid ) { + char bm_name[128]; + ::mbmsrv_buffer_name(bmid, bm_name, sizeof(bm_name)); + this->error("[%s] Client accident event: %p len: %9ld typ: %3d mask: [%08X,%08X,%08X,%08X]", + bm_name, address, len, typ, mask[0], mask[1], mask[2], mask[3]); + if ( this->m_output_prod ) { + try { + std::lock_guard lock(this->m_output_lock); + int sc = this->m_output_prod->getSpace(len); + if ( sc != MBM_REQ_CANCEL ) { + MBM::EventDesc& e = this->m_output_prod->event(); + e.type = EVENT_TYPE_ERROR; + e.len = len; + ::memcpy(e.mask, mask, sizeof(e.mask)); + ::memcpy(e.data, address, len); + this->m_output_prod->sendEvent(); + } + } + catch(const std::exception& e) { + this->error("Error declaring accident event: %s.", e.what()); + } + catch(...) { + this->error("Unknown exception declaring accident event."); + } + } + } +} diff --git a/Online/Dataflow/src/components/MBMServer.h b/Online/Dataflow/src/components/MBMServer.h index 4ccd68009beabd330efb420a6e73b262527b8a10..9efadb3d061c89de41241b2dfadfff2fd9c1849f 100644 --- a/Online/Dataflow/src/components/MBMServer.h +++ b/Online/Dataflow/src/components/MBMServer.h @@ -19,11 +19,13 @@ // Framework include files #include +#include #include -#include // C/C++ include files #include +#include +#include // Forward declarations namespace MBM { @@ -41,26 +43,35 @@ namespace Online { * @version 1.0 */ class MBMServer : public DataflowComponent { + public: typedef std::vector ConsRequirement; + typedef std::vector AccidentBuffers; typedef std::map ConsRequirements; - typedef std::map ServedBuffers; + typedef std::map ServedBuffers; private: - /// Property: Process name used to include into MEP/MBM buffers - std::string m_procName; /// Property: Initialization flags to possibly install MBM/MEP buffers - std::string m_initFlags; + std::string m_init_flags { }; + /// Property: partition name used to connect to buffer managers (If set overrides partition identifier) + std::string m_partition_name { }; + /// Property: Buffer name to save accident events to + std::string m_accident_output { }; /// Property: Container of consumer requirements. Only valid if the buffers are held! - ConsRequirements m_consRequirements; + ConsRequirements m_consumer_requirements { }; + /// Property: Buffer names with accident handlers attached + AccidentBuffers m_accident_buffers { }; /// Property: partition identifier used to connect to buffer managers - int m_partitionID; - /// Property: partition name used to connect to buffer managers (If set overrides partition identifier) - std::string m_partitionName; + int m_partition_id { -1 }; /// Property: Flag to indicate if buffer names should contain partition ID - bool m_partitionBuffers; + bool m_partition_buffers { false }; + /// Map of server BMIDs - ServedBuffers m_srvBMIDs; + ServedBuffers m_server_bm { }; + /// BMID of output buffer + std::unique_ptr m_output_prod { }; + /// Lock of output buffer to handle multiple accident buffers + std::mutex m_output_lock { }; public: /// Initializing constructor @@ -83,7 +94,12 @@ namespace Online { int setConsumerRequirement(ServerBMID srvBM, const std::string& task_match, const MBM::Requirement& req); + /// Subscribe to 'accident' events (client crashes etc.) + int subscribeAccidents(); + /// Unsubscribe from 'accident' events (client crashes etc.) + int unsubscribeAccidents(); + /// Handle accident events + void handleAccident(ServerBMID bmid, const void* address, size_t len, int typ, const unsigned int mask[]); }; } // end namespace Online - #endif // ONLINE_DATAFLOW_MBMSERVER_H diff --git a/Online/FarmConfig/job/EBAlignWR.sh b/Online/FarmConfig/job/EBAlignWR.sh index 55600fdb7ce3fd351671d261bc87dd3542bd3124..0530d51da234b932929de81e24fde2388e7c9505 100755 --- a/Online/FarmConfig/job/EBAlignWR.sh +++ b/Online/FarmConfig/job/EBAlignWR.sh @@ -9,4 +9,4 @@ # # ========================================================================= # -execute `dataflow_task Class1` -opts=${STATIC_OPTS}/EBAlignWR.opts ${AUTO_STARTUP} ${DEBUG_STARTUP}; +execute `dataflow_task Class2` -opts=${STATIC_OPTS}/EBAlignWR.opts ${AUTO_STARTUP} ${DEBUG_STARTUP}; diff --git a/Online/FarmConfig/options/EBAlignWR.opts b/Online/FarmConfig/options/EBAlignWR.opts index 8bd3120f650ab75bcbb3e82177bf485f9f0ee2ab..baca3a75c78a1941cff41159840efd6d1c630123 100755 --- a/Online/FarmConfig/options/EBAlignWR.opts +++ b/Online/FarmConfig/options/EBAlignWR.opts @@ -18,8 +18,13 @@ MBM.PartitionName = @OnlineEnv.PartitionName; MBM.PartitionID = @OnlineEnv.PartitionID; MBM.Buffers = { "Output" }; // +// Writer manager should not be a VIP consumer: May on 'stop' block Allen/EBPass if these are declared as Class1! +// WrManager.Input = "Output"; -Param.FileSizeLimit = 10000; +WrManager.Requirement = "EvType=2;TriggerMask=0x0,0x0,0x0,0x0;VetoMask=0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF;MaskType=ANY;UserType=USER;Frequency=PERC;Perc=100.0"; +// +// File size limit in kB (50000 = 500 MB) +Param.FileSizeLimit = 50000; Param.FilePrefix = "/alignment/${PARTITION}/${STREAM}/${RUN}/Run_${RUN}_${NODE}_${TIME}.dat"; Param.DeviceList = { "/hlt2" }; Param.DimSteering = 0; diff --git a/Online/FarmConfig/options/EBMBM.opts b/Online/FarmConfig/options/EBMBM.opts index 91ac15e12ad6e3edfbcf68dcd1432333345fcb35..f652f1d95b5221900f2a52fff25cd1468f232313 100644 --- a/Online/FarmConfig/options/EBMBM.opts +++ b/Online/FarmConfig/options/EBMBM.opts @@ -6,10 +6,14 @@ Manager.Setup = {"Dataflow_MBMServer/MEPManager"}; Manager.Services = {"Dataflow_UI/UI"}; MEPManager.PartitionBuffers = true; MEPManager.PartitionName = @OnlineEnv.PartitionName; +// +//MEPManager.AccidentBuffers = {"Events_0", "Events_1"}; +//MEPManager.AccidentOutput = {"Output"}; +// // Input buffer size (Events): 2 * 16 GB MEPManager.InitFlags = "-s=16000000 -e=150 -u=20 -b=12 -t=1 -n=0 -f -i=Events_0 -c -s=16000000 -e=150 -u=20 -b=12 -t=1 -n=1 -f -i=Events_1 -c -s=500000 -e=150 -u=5 -b=12 -t=1 -f -i=Output -c"; MEPManager.InitFlags = "-s=5000000 -e=150 -u=20 -b=12 -t=1 -n=0 -f -i=Events_0 -c -s=5000000 -e=150 -u=20 -b=12 -t=1 -n=1 -f -i=Events_1 -c -s=500000 -e=150 -u=7 -b=12 -t=1 -f -i=Output -c"; /// -/// !!! The buffer flags are set in EBMBM.sh +/// !!! The buffer flags are set in EBMBM.sh !!! /// MEPManager.InitFlags = "$MBMINIT_FLAGS"; diff --git a/Online/FarmConfig/options/EBStorageERR.opts b/Online/FarmConfig/options/EBStorageERR.opts new file mode 100644 index 0000000000000000000000000000000000000000..60124238a6b8629bc6c1c8f33d617d3fbd54c18e --- /dev/null +++ b/Online/FarmConfig/options/EBStorageERR.opts @@ -0,0 +1,18 @@ +#include "$FARMCONFIGROOT/options/StorageWriter.opts" +Monitoring.CounterUpdateInterval = 3; +Manager.TaskType = "Dataflow_Class2"; +MBM.ConnectWhen = "initialize"; +// +EventProc.REQ2 = "EvType=4;TriggerMask=0xffffffff,0xffffffff,0xffffffff,0xffffffff;VetoMask=0,0,0,0;MaskType=ANY;UserType=VIP;Frequency=PERC;Perc=100.0"; +// +// NFS writing +Writer.IdleTimeout = 2; +Writer.HaveFileDB = 0; +Writer.NumThreads = 3; +Writer.NumBuffers = 4; +Writer.MaxFileSizeMB = 1000; +Writer.BufferSizeMB = 1000; +Writer.MaxEvents = 1; +Writer.MEP2MDF = 0; +Writer.OutputType = "POSIX"; +Writer.FileName = "/hlt2/errors/${PARTITION}/${RUN}/Error_${RUN}_${NODE}.${SEQ}.mep"; diff --git a/Online/Gaucho/src/components/UpdateAndReset.cpp b/Online/Gaucho/src/components/UpdateAndReset.cpp index 20b122453222b3ef16d5f2f71f7610fbbab07ecc..8e0a12116c1bbaa594e6f5ace5615a0049d5ed99 100644 --- a/Online/Gaucho/src/components/UpdateAndReset.cpp +++ b/Online/Gaucho/src/components/UpdateAndReset.cpp @@ -31,10 +31,10 @@ #include #include -#include -#include #include #include +#include +#include #include #include @@ -49,11 +49,14 @@ #include #include + using namespace Online; // Static Factory declaration DECLARE_COMPONENT( UpdateAndReset ) +typedef std::vector > evt_data_t; + namespace { static const std::string s_statusNoUpdated( "NO_UPDATED" ); static const std::string s_statusProcessingUpdate( "PROCESSINGUPDATE" ); @@ -181,7 +184,7 @@ StatusCode UpdateAndReset::start() { StatusCode UpdateAndReset::execute( EventContext const& /* ctxt */ ) const { UpdateAndReset* This = const_cast( this ); /// Otherwise normal execution: retrieve the run number from ODIN - const evt_data_t* event = m_rawData.get(); + const evt_data_t* event = (evt_data_t*)m_rawData.get(); int runno = 100; if ( event ) { for ( const auto& b : *event ) { diff --git a/Online/Gaucho/src/components/UpdateAndReset.h b/Online/Gaucho/src/components/UpdateAndReset.h index 423002d04beec330d604caea5e387917d16ab73d..f49fb3ff158caa655a21ec46ac85dc62a4ac6a57 100644 --- a/Online/Gaucho/src/components/UpdateAndReset.h +++ b/Online/Gaucho/src/components/UpdateAndReset.h @@ -23,11 +23,13 @@ #include #include +/// Forward declarations class TDirectory; class DimService; +namespace LHCb { class RawBank; } +/// Online namespace declaration namespace Online { - class Tell1Bank; class UpdateAndReset : public Gaudi::Algorithm { public: @@ -57,9 +59,8 @@ namespace Online { ServiceHandle m_sinkSvc{this, "MonitorSink", "OnlMonitorSink", "Service for accessing monitoring entities to save"}; SmartIF m_rootSinkSvc; ///< RootHistogramSink - typedef Tell1Bank BankHeader; - typedef std::vector > evt_data_t; - DataObjectReadHandle m_rawData{this,"RawData","Banks/RawData"}; + typedef std::vector > lb_evt_data_t; + DataObjectReadHandle m_rawData{this,"RawData","Banks/RawData"}; std::shared_ptr m_dimSvcSaveSetLoc; std::mutex m_mutex; diff --git a/Online/GaudiOnline/components/OnlineEventApp.cpp b/Online/GaudiOnline/components/OnlineEventApp.cpp index 884bf0150453e75c17f102d6aa3a0216e229086c..f0f8df24b5e0f36e8748c4f615d5e924c298d0ce 100644 --- a/Online/GaudiOnline/components/OnlineEventApp.cpp +++ b/Online/GaudiOnline/components/OnlineEventApp.cpp @@ -390,6 +390,9 @@ int OnlineEventApp::stop() { for ( auto& t : m_eventAccess ) stop_thread( t ); m_eventAccess.clear(); stop_thread( m_eventLoop ); + /// Event loop finally finished. Declare it as stopped and wait until + /// the status thread has finished. + m_halt = EVENTLOOP_STOPPED; for ( auto& t : m_eventStatus ) stop_thread( t ); m_eventStatus.clear(); m_ioSvc->access.reset(); @@ -397,7 +400,7 @@ int OnlineEventApp::stop() { m_events->close(); m_events.reset(); } - m_halt = EVENTLOOP_STOPPED; + m_halt = EVENTLOOP_ENDED; return OnlineApplication::stop(); } @@ -641,7 +644,7 @@ ReStart: this_thread::sleep_for( chrono::microseconds( 100 ) ); } while ( !ev_que->empty() || ( this->m_events->eventsBuffered() > 0 ) || - ( this->m_halt != EVENTLOOP_STOP ) ); + ( this->m_halt != EVENTLOOP_STOPPED ) ); } catch ( const exception& e ) { this->m_logger->error( "+++ queued_event_result: Event result exception: %s", e.what() ); goto ReStart; diff --git a/Online/GaudiOnline/components/OnlineEventApp.h b/Online/GaudiOnline/components/OnlineEventApp.h index 8c3bc42fc1f94b1b281a4f437718abe2216084f9..ece63f5ebdca5e980421f162167010ec41b0b1c4 100644 --- a/Online/GaudiOnline/components/OnlineEventApp.h +++ b/Online/GaudiOnline/components/OnlineEventApp.h @@ -53,7 +53,8 @@ namespace Online { EVENTLOOP_EXECUTE = 1 << 0, EVENTLOOP_PAUSE = 1 << 1, EVENTLOOP_STOP = 1 << 2, - EVENTLOOP_STOPPED = 1 << 3 + EVENTLOOP_STOPPED = 1 << 3, + EVENTLOOP_ENDED = 1 << 4 }; /// Forward declaration to the DIM command changing the number of processing threads class instance_command_t; diff --git a/Online/GaudiOnline/src/QueuedFlowManager.cpp b/Online/GaudiOnline/src/QueuedFlowManager.cpp index 1e4aecf9f7353387ecdef232fbedbac883cc0930..4baf983c67363de8c311218aba20777cd85628ab 100644 --- a/Online/GaudiOnline/src/QueuedFlowManager.cpp +++ b/Online/GaudiOnline/src/QueuedFlowManager.cpp @@ -76,7 +76,9 @@ void QueuedFlowManager::push(EventContext&& ctx) { /// IQueueingEventProcessor override: Tell if the processor has events in the queues. bool QueuedFlowManager::empty() const { - return m_inFlight == 0 && m_done.size() == 0; + std::size_t evt_in_flight = m_inFlight.load(); + std::size_t evt_in_done = m_done.size(); + return evt_in_flight == 0 && evt_in_done == 0; } /// IQueueingEventProcessor override: Get the next available result. diff --git a/Online/OnlineBase/include/MBM/bmserver.h b/Online/OnlineBase/include/MBM/bmserver.h index 443c7fccc2a0b3685368d148b35f58e8e45e9e69..c90f04829ea419e5401078bc28ed1eacd63d4ea3 100644 --- a/Online/OnlineBase/include/MBM/bmserver.h +++ b/Online/OnlineBase/include/MBM/bmserver.h @@ -29,6 +29,13 @@ std::map mbm_multi_install(int argc , char** argv); extern "C" { ServerBMID mbm_install_server(int argc , char** argv); + /// Access buffer name from BMID + int mbmsrv_buffer_name(ServerBMID bm, char* buff, size_t buff_len); + /// Access buffer address from offset + int mbmsrv_event_address(ServerBMID bm, long offset, void** address); + /// Access server communication type + int mbmsrv_communication_type(ServerBMID bm); + /// Add consumer requirement to server int mbmsrv_require_consumer(ServerBMID bm, const char* name, int partid, int evtype, const unsigned int mask[]); /// Deregister consumer requirement from server @@ -70,7 +77,12 @@ extern "C" { /// Check existing clients with cleanup void mbmsrv_check_clients(ServerBMID bm); /// Call mbmsrv_check_pending_tasks and mbmsrv_check_clients if necessary - int mbmsrv_client_watch_cycle(ServerBMID bm); -} + int mbmsrv_client_watch_cycle(ServerBMID bm); + typedef void (*mbmsrv_accident_callback_t)(void* param, ServerBMID bmid, long offset, size_t len, int typ, const unsigned int mask[]); + /// Subscribe to 'accident' events (client crashes etc.) + int mbmsrv_subscribe_accidents(ServerBMID bm, void* param, mbmsrv_accident_callback_t callback); + /// Unsubscribe from 'accident' events (client crashes etc.) + int mbmsrv_unsubscribe_accidents(ServerBMID bm); +} #endif // _MBM_MBMSERVER_H diff --git a/Online/OnlineBase/src/MBM/mbmlib_client.cpp b/Online/OnlineBase/src/MBM/mbmlib_client.cpp index 53d42de47c778f2913ef93438b636c8a21b3262b..48524e4ae36b44691bd92f0d66e16f978bd456d9 100755 --- a/Online/OnlineBase/src/MBM/mbmlib_client.cpp +++ b/Online/OnlineBase/src/MBM/mbmlib_client.cpp @@ -130,7 +130,7 @@ int _mbm_cons_message(BMID bm, int code) { int _mbm_shutdown (void* /* param */) { qentry_t *q, *bmq = desc_head; BMID bm, ids[32]; - MSG msg(MSG::EXCLUDE); + MSG msg(MSG::FORCE_SHUTDOWN); int cnt = 0, sc = 1, len=sizeof(ids)/sizeof(ids[0]); if ( 0 == reference_count ) { @@ -152,6 +152,7 @@ int _mbm_shutdown (void* /* param */) { bm = ids[i]; if ( bm->connection.server.request>0 ) { // Send EXCLUDE message to server msg.user = bm->user; // if this is still possible. + msg.data.exclude.pid = bm->pid; bm->communication.send_request_server(msg,false); // Otherwise it could also not become worse. } } @@ -471,6 +472,8 @@ BMID mbm_include_write (const char* bm_name, const char* name, int partid, int c int mbm_exclude (BMID bm) { MBM_CHECK_CONS(bm); MSG msg(MSG::EXCLUDE,bm->user); + MSG::exclude_t& exclude = msg.data.exclude; + exclude.pid = bm->pid; if ( bm->communication.communicate_server(msg,0) != MBM_NORMAL ) { ::_mbm_printf("++bm_client++ communication problem while unlinking the connection"); } diff --git a/Online/OnlineBase/src/MBM/mbmlib_message.h b/Online/OnlineBase/src/MBM/mbmlib_message.h index 6f86f6dd8289fdb6d87d5ebf95199d2c1436b9a9..4d1d22731bea0f0e505128f5fa7b52346a57d584 100755 --- a/Online/OnlineBase/src/MBM/mbmlib_message.h +++ b/Online/OnlineBase/src/MBM/mbmlib_message.h @@ -34,6 +34,7 @@ struct MBMMessage { INCLUDE = 1, EXCLUDE = 2, RECONNECT = 3, + FORCE_SHUTDOWN = 4, // Consummer commands ADD_REQUEST = 103, DEL_REQUEST = 104, @@ -79,6 +80,7 @@ struct MBMMessage { int pid; int partid; }; + typedef include_t exclude_t; typedef include_t process_exists_t; struct requirement_t { // size: 16+16+20=52 @@ -127,6 +129,7 @@ struct MBMMessage { union msg_structs { include_t include; + exclude_t exclude; requirement_t requirement; cons_requirement_t cons_requirement; get_space_t get_space; diff --git a/Online/OnlineBase/src/MBM/mbmlib_server.cpp b/Online/OnlineBase/src/MBM/mbmlib_server.cpp index 65fe50b67f2ff38e69c86968fea23f780fe6f966..d2c0baecb68ab8c8b85150c5853a8dbbe1d70c34 100755 --- a/Online/OnlineBase/src/MBM/mbmlib_server.cpp +++ b/Online/OnlineBase/src/MBM/mbmlib_server.cpp @@ -94,7 +94,7 @@ typedef MBMMessage MSG; int mbmsrv_include (ServerBMID bm, void* connection, MSG& msg); /// Reconnect after moving to different worker int mbmsrv_reconnect (ServerBMID bm, void* connection, MSG& msg); -/// Exclude client from buffer manager +/// Exclude client from buffer manager (locked) int mbmsrv_exclude (ServerBMID bm, MSG& msg); /// Consumer interface: Free event after processing int mbmsrv_free_event(ServerBMID bm, MSG& msg); @@ -125,11 +125,19 @@ int mbmsrv_req_consumer(ServerBMID bm, MSG& msg); /// Deregister consumer requirement from server int mbmsrv_unreq_consumer(ServerBMID bm, MSG& msg); +/// Exclude client from buffer manager (unlocked) +int _mbmsrv_exclude (ServerBMID bm, MSG& msg); /// Evaluate consumer rules int _mbmsrv_evaluate_rules(ServerBMID bm); /// Match single consumer requirement int _mbmsrv_match_cons_req(const MSG::cons_requirement_t& rq, const ServerBMID_t::ConsumerREQ& cr); +/// Access buffer name from BMID +int mbmsrv_buffer_name(ServerBMID bm, char* buff, size_t buff_len); +/// Access buffer address from offset +int mbmsrv_event_address(ServerBMID bm, long offset, void** address); +/// Access server communication type +int mbmsrv_communication_type(ServerBMID bm); /// Server interface: Connect server process int _mbmsrv_connect(ServerBMID bm); @@ -180,6 +188,8 @@ int _mbmsrv_check_freqmode (ServerBMID bm); void _mbmsrv_rel_event (ServerBMID bm, USER* u); /// Get event pending / subscribe to event eventually pending int _mbmsrv_get_ev(ServerBMID bm, USER* u); +/// Perform extra actions on held events ect. on shutdown +int _mbmsrv_handle_accident(ServerBMID bm, USER* u); ServerBMID_t::ServerBMID_t() : BufferMemory() { @@ -705,7 +715,7 @@ void _mbmsrv_rel_event (ServerBMID bm, USER* u) { // Release event if held by user if ( u->held_eid != EVTID_NONE ) { EVENT *e = bm->event + u->held_eid; - _mbmsrv_evt_clear(e,u); + _mbmsrv_evt_clear(e, u); if ( _mbmsrv_evt_held(e) ) { return; } @@ -726,7 +736,7 @@ void _mbmsrv_uclean(ServerBMID bm, USER* u) { u->space_size = 0; } - _mbmsrv_rel_event(bm, u); // free the held event + _mbmsrv_rel_event(bm, u); // free the held event if (u->state == S_wevent ) _mbmsrv_del_wev(bm, u); @@ -871,16 +881,204 @@ int _mbmsrv_add_req_consumer(ServerBMID bm, const char* name, int partid, int ev return MBM_ERROR; } +/// Exclude client from buffer manager (unlocked) +int _mbmsrv_exclude (ServerBMID bm, MSG& msg) { + USER* user = msg.user; + if ( user ) { + bm->communication.poll_del(bm->clients,user->connection); + msg.status = MBM_NORMAL; + bm->communication.send_response(user->connection,msg); + if ( user->busy == 1 ) _mbmsrv_uclean(bm, user); + else bm->communication.close(user->connection); + } + return MBM_NO_REPLY; +} + +/// Perform extra actions on held events ect. on shutdown +int _mbmsrv_handle_accident(ServerBMID bm, USER* user) { + if ( user && bm ) { + auto call_back = bm->accident_call; + void* param = bm->accident_param; + if ( call_back ) { + MBMScanner que(bm->usDesc,-USER_next_off); + for(USER* u=que.get(); u; u=que.get()) { + if ( user == u ) { + if ( u->busy && u->state == S_active && u->held_eid >= 0 ) { + EVENT *e = bm->event + u->held_eid; + (*call_back)(param, bm, e->ev_add, e->ev_size, e->ev_type, e->tr_mask.bits()); + } + return MBM_NORMAL; + } + } + } + } + return MBM_NORMAL; +} + +/// Check wait consumer event queue +void _mbmsrv_check_wev (ServerBMID bm, EVENT* e) { + int count = 0; + MBMScanner que(&bm->usDesc->wev_head, -USER_we_off); + for(USER* u = que.get(); u != 0; u = que.get(), ++count ) { + if ( u->state == S_wevent ) { + int uid = u->uid; + bool req_pending = e->umask0.test(uid) || e->umask1.test(uid); + for(size_t i=0; !req_pending && ione_mask[i].test(uid) ) { + // BM_REQ_ONE consumer: No other from the same group may see this event + // and of course: the same client should not see it twice! + req_pending = true; + e->one_mask[i].clear(); + //break; + } + } + if ( req_pending ) { + u->ev_ptr = e->ev_add; + u->ev_size = e->ev_size; + u->ev_type = e->ev_type; + u->ev_trmask = e->tr_mask; + u->held_eid = e->eid; + e->umask0.clear(uid); + e->umask1.clear(uid); + e->held_mask.set(uid); +#if 0 + // BM_REQ_ONE consumer: No other from the same group may see this event + // and of course: the same client should not see it twice! + for(size_t i=0; ione_mask[i].test(uid) ) { + e->one_mask[i].clear(); + } + } +#endif + u->ev_seen++; + ++bm->ctrl->tot_seen; + _mbmsrv_del_wev(bm, u); + // Event is ready: inform the client with the information where to find it. + MSG msg(MSG::GET_EVENT, u, MBM_NORMAL); + MSG::get_event_t& evt = msg.data.get_event; + evt.size = u->ev_size; + evt.type = u->ev_type; + evt.offset = u->ev_ptr; + ::memcpy(evt.trmask,&u->ev_trmask,sizeof(evt.trmask)); + u->state = S_active; + msg.status=bm->communication.send_response(u->connection,msg); + return; + } + } + if ( count == (bm->ctrl->p_umax+1) ) { + // Something is really odd here. + // The BM internal structure looks corrupted.... + ::lib_rtl_output(LIB_RTL_ERROR, + "++bm_server++ structures of '%s' look corrupted. Something ugly happened!", + bm->bm_name); + return; + } + } +} + +/// Check consumer if (some) clients are waiting for events +int _mbmsrv_check_cons(ServerBMID bm, USER* u) { + int owner = u->uid; + if ( owner == -1 ) { + return MBM_INTERNAL; + } + MBMScanner que(bm->evDesc, -EVENT_next_off); + for(EVENT* e=que.get(); e; e=que.get() ) { + if (e->busy != 2) { + continue; + } + if ( e->held_mask.test(owner) ) { + e->held_mask.clear(owner); + e->busy = 1; + if ( bm->ctrl->wait_event_count > 0 ) { + _mbmsrv_check_wev(bm, e); // check wev queue + } + } + } + return MBM_NORMAL; +} + +/// Get event pending / subscribe to event eventually pending +int _mbmsrv_get_ev(ServerBMID bm, USER* u) { + int uid = u->uid; + MBMScanner que(bm->evDesc, -EVENT_next_off); + for(EVENT* e = que.get(); e != 0; e = que.get() ) { + if ( (e->busy != 2) && (e->busy != 0) ) { + bool req_pending = e->umask0.test(uid) || e->umask1.test(uid); + for(size_t i=0; ione_mask[i].test(uid) ) { + // BM_REQ_ONE consumer: No other from the same group may see this event + // and of course: the same client should not see it twice! + e->one_mask[i].clear(); + req_pending = true; + //break; + } + } + if ( req_pending ) { + u->ev_ptr = e->ev_add; + u->ev_size = e->ev_size; + u->ev_type = e->ev_type; + u->ev_trmask = e->tr_mask; + u->held_eid = e->eid; + e->held_mask.set(uid); + e->umask0.clear(uid); + e->umask1.clear(uid); +#if 0 + // BM_REQ_ONE consumer: No other from the same group may see this event + // and of course: the same client should not see it twice! + for(size_t i=0; ione_mask[i].test(uid) ) { + e->one_mask[i].clear(); + } + } +#endif + u->ev_seen++; + bm->ctrl->tot_seen++; + return MBM_NORMAL; + } + } + } + return MBM_NO_EVENT; +} + +/// Access buffer name from BMID +int mbmsrv_buffer_name(ServerBMID bm, char* buff, size_t buff_len) { + if ( bm && buff ) { + ::strncpy(buff, bm->bm_name, buff_len); + buff[buff_len-1] = 0; + return MBM_NORMAL; + } + return MBM_ERROR; +} + +/// Access buffer address from offset +int mbmsrv_event_address(ServerBMID bm, long offset, void** address) { + if ( bm && bm->buffer_add ) { + char* ev_add = bm->buffer_add + offset; + *address = ev_add; + return MBM_NORMAL; + } + return MBM_ERROR; +} + +/// Access server communication type +int mbmsrv_communication_type(ServerBMID bm) { + if ( bm ) { + return bm->communication.type; + } + return BM_COM_NONE; +} + /// Add consumer requirement to server int mbmsrv_req_consumer(ServerBMID bm, MSG& msg) { const MSG::cons_requirement_t& rq = msg.data.cons_requirement; - return mbmsrv_require_consumer(bm,rq.name,rq.partid,rq.evtype,rq.mask); + return mbmsrv_require_consumer(bm, rq.name, rq.partid, rq.evtype, rq.mask); } /// Deregister consumer requirement from server int mbmsrv_unreq_consumer(ServerBMID bm, MSG& msg) { const MSG::cons_requirement_t& rq = msg.data.cons_requirement; - return mbmsrv_unrequire_consumer(bm,rq.name,rq.partid,rq.evtype,rq.mask); + return mbmsrv_unrequire_consumer(bm, rq.name, rq.partid, rq.evtype, rq.mask); } /// Add consumer requirement to server @@ -997,19 +1195,19 @@ int mbmsrv_reconnect (ServerBMID bmid, void* connection, MSG& msg) { return MBM_NORMAL; } -/// Exclude client from buffer manager +/// Exclude client from buffer manager (locked) int mbmsrv_exclude (ServerBMID bmid, MSG& msg) { ServerBMID bm = CHECK_BMID(bmid); ServerBMID_t::LOCK lock(bm->lockid); - USER* u = msg.user; - if ( u ) { - bm->communication.poll_del(bm->clients,u->connection); - msg.status = MBM_NORMAL; - bm->communication.send_response(u->connection,msg); - if ( u->busy == 1 ) _mbmsrv_uclean(bm, u); - else bm->communication.close(u->connection); - } - return MBM_NO_REPLY; + return _mbmsrv_exclude(bm, msg); +} + +/// Exclude client from buffer manager (locked) +int mbmsrv_shutdown_client (ServerBMID bmid, MSG& msg) { + ServerBMID bm = CHECK_BMID(bmid); + ServerBMID_t::LOCK lock(bm->lockid); + _mbmsrv_handle_accident(bm, msg.user); + return _mbmsrv_exclude(bm, msg); } /* @@ -1042,49 +1240,6 @@ int mbmsrv_pause(ServerBMID bmid, MSG& msg) { return MBM_NORMAL; } -/// Get event pending / subscribe to event eventually pending -int _mbmsrv_get_ev(ServerBMID bm, USER* u) { - int uid = u->uid; - MBMScanner que(bm->evDesc, -EVENT_next_off); - for(EVENT* e = que.get(); e != 0; e = que.get() ) { - if ( (e->busy != 2) && (e->busy != 0) ) { - bool req_pending = e->umask0.test(uid) || e->umask1.test(uid); - for(size_t i=0; ione_mask[i].test(uid) ) { - // BM_REQ_ONE consumer: No other from the same group may see this event - // and of course: the same client should not see it twice! - e->one_mask[i].clear(); - req_pending = true; - //break; - } - } - if ( req_pending ) { - u->ev_ptr = e->ev_add; - u->ev_size = e->ev_size; - u->ev_type = e->ev_type; - u->ev_trmask = e->tr_mask; - u->held_eid = e->eid; - e->held_mask.set(uid); - e->umask0.clear(uid); - e->umask1.clear(uid); -#if 0 - // BM_REQ_ONE consumer: No other from the same group may see this event - // and of course: the same client should not see it twice! - for(size_t i=0; ione_mask[i].test(uid) ) { - e->one_mask[i].clear(); - } - } -#endif - u->ev_seen++; - bm->ctrl->tot_seen++; - return MBM_NORMAL; - } - } - } - return MBM_NO_EVENT; -} - /// Consumer interface: Get event pending / subscribe to event eventually pending int mbmsrv_get_event(ServerBMID bm, MSG& msg) { ServerBMID_t::LOCK lock(bm->lockid); @@ -1406,89 +1561,6 @@ int mbmsrv_free_space(ServerBMID bm, MSG& msg) { return MBM_NO_EVENT; } -/// Check wait consumer event queue -void _mbmsrv_check_wev (ServerBMID bm, EVENT* e) { - int count = 0; - MBMScanner que(&bm->usDesc->wev_head, -USER_we_off); - for(USER* u = que.get(); u != 0; u = que.get(), ++count ) { - if ( u->state == S_wevent ) { - int uid = u->uid; - bool req_pending = e->umask0.test(uid) || e->umask1.test(uid); - for(size_t i=0; !req_pending && ione_mask[i].test(uid) ) { - // BM_REQ_ONE consumer: No other from the same group may see this event - // and of course: the same client should not see it twice! - req_pending = true; - e->one_mask[i].clear(); - //break; - } - } - if ( req_pending ) { - u->ev_ptr = e->ev_add; - u->ev_size = e->ev_size; - u->ev_type = e->ev_type; - u->ev_trmask = e->tr_mask; - u->held_eid = e->eid; - e->umask0.clear(uid); - e->umask1.clear(uid); - e->held_mask.set(uid); -#if 0 - // BM_REQ_ONE consumer: No other from the same group may see this event - // and of course: the same client should not see it twice! - for(size_t i=0; ione_mask[i].test(uid) ) { - e->one_mask[i].clear(); - } - } -#endif - u->ev_seen++; - ++bm->ctrl->tot_seen; - _mbmsrv_del_wev(bm, u); - // Event is ready: inform the client with the information where to find it. - MSG msg(MSG::GET_EVENT, u, MBM_NORMAL); - MSG::get_event_t& evt = msg.data.get_event; - evt.size = u->ev_size; - evt.type = u->ev_type; - evt.offset = u->ev_ptr; - ::memcpy(evt.trmask,&u->ev_trmask,sizeof(evt.trmask)); - u->state = S_active; - msg.status=bm->communication.send_response(u->connection,msg); - return; - } - } - if ( count == (bm->ctrl->p_umax+1) ) { - // Something is really odd here. - // The BM internal structure looks corrupted.... - ::lib_rtl_output(LIB_RTL_ERROR, - "++bm_server++ structures of '%s' look corrupted. Something ugly happened!", - bm->bm_name); - return; - } - } -} - -/// Check consumer if (some) clients are waiting for events -int _mbmsrv_check_cons(ServerBMID bm, USER* u) { - int owner = u->uid; - if ( owner == -1 ) { - return MBM_INTERNAL; - } - MBMScanner que(bm->evDesc, -EVENT_next_off); - for(EVENT* e=que.get(); e; e=que.get() ) { - if (e->busy != 2) { - continue; - } - if ( e->held_mask.test(owner) ) { - e->held_mask.clear(owner); - e->busy = 1; - if ( bm->ctrl->wait_event_count > 0 ) { - _mbmsrv_check_wev(bm, e); // check wev queue - } - } - } - return MBM_NORMAL; -} - /// Producer interface: Notify consumers about the presence of an event int mbmsrv_send_space(ServerBMID bm, MSG& msg) { USER* u = CHECKED_CLIENT(msg.user); @@ -1659,11 +1731,12 @@ void mbmsrv_check_clients(ServerBMID bm) { int ret = ::kill(u->pid,0); if ( -1 == ret ) { ServerBMID_t::LOCK lock(bm->lockid); + _mbmsrv_handle_accident(bm, u); ret = bm->communication.poll_del(bm->clients,u->connection); if ( ret == MBM_NORMAL ) { ::fprintf(stdout,"[ERROR] MBM server removing dead client '%s'.pid:%d\n",u->name,u->pid); } - _mbmsrv_uclean(bm,u); + _mbmsrv_uclean(bm, u); client_removed = true; } } @@ -1672,7 +1745,7 @@ void mbmsrv_check_clients(ServerBMID bm) { ServerBMID_t::LOCK lock(bm->lockid); MBMScanner que(bm->evDesc, -EVENT_next_off); for(EVENT* e=que.get(); e; e=que.get() ) { - _mbmsrv_check_wev(bm,e); // check wev queue + _mbmsrv_check_wev(bm, e); // check wev queue } } if ( bm->ctrl->wait_space_count > 0 ) { @@ -1687,7 +1760,10 @@ int mbmsrv_handle_request(ServerBMID bm, void* connection, MBMMessage& msg) { mbmsrv_check_clients(bm); return msg.status = mbmsrv_include(bm,connection,msg); case MSG::EXCLUDE: - return msg.status = mbmsrv_exclude(bm,msg); + //return msg.status = mbmsrv_exclude(bm,msg); + return msg.status = mbmsrv_shutdown_client(bm,msg); + case MSG::FORCE_SHUTDOWN: + return msg.status = mbmsrv_shutdown_client(bm,msg); case MSG::RECONNECT: return msg.status = mbmsrv_reconnect(bm,connection,msg); @@ -1731,6 +1807,26 @@ int mbmsrv_handle_request(ServerBMID bm, void* connection, MBMMessage& msg) { } return msg.status; } + +/// Subscribe to 'accident' events (client crashes etc.) +int mbmsrv_subscribe_accidents(ServerBMID bm, void* param, mbmsrv_accident_callback_t callback) { + if ( bm ) { + bm->accident_param = param; + bm->accident_call = callback; + return MBM_NORMAL; + } + return MBM_ERROR; +} + +/// Unsubscribe from 'accident' events (client crashes etc.) +int mbmsrv_unsubscribe_accidents(ServerBMID bm) { + if ( bm ) { + bm->accident_param = nullptr; + bm->accident_call = nullptr; + return MBM_NORMAL; + } + return MBM_ERROR; +} /// Call mbmsrv_check_pending_tasks and mbmsrv_check_clients if necessary int mbmsrv_client_watch_cycle(ServerBMID bm) { diff --git a/Online/OnlineBase/src/MBM/mbmlib_server.h b/Online/OnlineBase/src/MBM/mbmlib_server.h index 2b9c9f86123e5265d67a66ab81b26d7b772020a3..dbdde3cf5d660ef664432aadc7d1dba496a404cf 100644 --- a/Online/OnlineBase/src/MBM/mbmlib_server.h +++ b/Online/OnlineBase/src/MBM/mbmlib_server.h @@ -9,8 +9,8 @@ // Author : M.Frank // //========================================================================== -#ifndef _MBM_MBMLIB_TIMER_H -#define _MBM_MBMLIB_TIMER_H +#ifndef _MBM_MBMLIB_SERVER_H +#define _MBM_MBMLIB_SERVER_H #include #include @@ -105,6 +105,7 @@ struct ServerBMID_t : public BufferMemory { int threaded_cleanup { 0 }; int stop { 0 }; int allow_declare { 1 }; + int save_events { 0 }; RTL_ast_t free_event { nullptr }; void* free_event_param { nullptr }; RTL_ast_t alloc_event { nullptr }; @@ -126,6 +127,11 @@ struct ServerBMID_t : public BufferMemory { lib_rtl_gbl_t comm_add { nullptr }; SHMCOMM_gbl* comm { nullptr }; + /// Callback setup for accident events (client crashes etc.) + typedef void (*accident_callback_t)(void* param, ServerBMID bmid, long offset, size_t len, int typ, const unsigned int mask[]); + accident_callback_t accident_call { nullptr }; + void* accident_param { nullptr }; + struct Server { MBMConnection connection; #ifndef MBM_HAVE_STD_THREAD @@ -161,4 +167,4 @@ static inline size_t mbm_section_length(size_t bytes) { return size_t((bytes+SECTION_ALIGNMENT-1)/SECTION_ALIGNMENT)*SECTION_ALIGNMENT; } -#endif // _MBM_MBMLIB_TIMER_H +#endif // _MBM_MBMLIB_SERVER_H diff --git a/Online/RawBankSizes/include/RawBankSizes/RawBankSizes.h b/Online/RawBankSizes/include/RawBankSizes/RawBankSizes.h index c0ef0270167110626ac446aeb1b0ce33c5d2a330..6cbd5bdb9550ac78020dcbbc4525a137282d3ddf 100644 --- a/Online/RawBankSizes/include/RawBankSizes/RawBankSizes.h +++ b/Online/RawBankSizes/include/RawBankSizes/RawBankSizes.h @@ -29,6 +29,10 @@ #include #include +/// Forward declarations +namespace LHCb { class RawBank; } + +/// Online namespace declaration namespace Online { class RawBankSizes : public Gaudi::Algorithm { @@ -39,8 +43,8 @@ namespace Online { typedef dethmap::iterator dethmiter; typedef std::pair dethinsrtp; - typedef Tell1Bank BankHeader; - typedef std::vector > evt_data_t; + typedef std::vector > evt_data_t; + typedef std::vector > lb_evt_data_t; protected: StatusCode fill_histos(const evt_data_t& data, int runno); @@ -60,7 +64,7 @@ namespace Online { protected: - DataObjectReadHandle m_rawData{this,"RawData","Banks/RawData"}; + DataObjectReadHandle m_rawData{this,"RawData","Banks/RawData"}; std::mutex m_mutex; std::vector m_detectorNames; diff --git a/Online/RawBankSizes/src/EventSize.cpp b/Online/RawBankSizes/src/EventSize.cpp index 099efecddbd6f1d3f8d114a9aa5d9e5ea1e3fbe5..7100a393a429edaf32821ca292173f22be57bfa5 100644 --- a/Online/RawBankSizes/src/EventSize.cpp +++ b/Online/RawBankSizes/src/EventSize.cpp @@ -20,11 +20,13 @@ #include #include -/* - * Online namespace declaration - */ +/// Forward declarations +namespace LHCb { class RawBank; } + +/// Online namespace declaration namespace Online { + /// Small example algorithm histogramming the event size /**@class EventSize EventSize.cpp * * Accumulate the total event size and fill a histogram @@ -35,15 +37,14 @@ namespace Online { class GAUDI_API EventSize: public Gaudi::Algorithm { private: - typedef Tell1Bank BankHeader; - typedef std::vector > evt_data_t; + typedef std::vector > lb_evt_data_t; private: - DataObjectReadHandle m_rawData {this,"RawData","Banks/RawData"}; - Gaudi::Property m_histPath{this,"HistogramPath", "EventSize/TotalSize","Place of histogram"}; - Gaudi::Property m_bins {this,"Bins", 100, "Number of histogram bins"}; - Gaudi::Property m_low {this,"Low", 0.0, "Low edge"}; - Gaudi::Property m_high {this,"High", 200e3, "High edge"}; + DataObjectReadHandle m_rawData {this,"RawData","Banks/RawData"}; + Gaudi::Property m_histPath{this,"HistogramPath", "EventSize/TotalSize","Place of histogram"}; + Gaudi::Property m_bins {this,"Bins", 100, "Number of histogram bins"}; + Gaudi::Property m_low {this,"Low", 0.0, "Low edge"}; + Gaudi::Property m_high {this,"High", 200e3, "High edge"}; AIDA::IHistogram1D* m_eveLen = 0; public: @@ -72,15 +73,15 @@ namespace Online { /// Algorithm overload: Event execution routine StatusCode execute(const EventContext& /* context */ ) const override final { - using namespace std; + typedef std::vector > evt_data_t; if ( !m_eveLen ) { error() << "Where the hack did my histogram go?" << endmsg; return StatusCode::FAILURE; } - const evt_data_t* event = m_rawData.get(); + const evt_data_t* event = reinterpret_cast(m_rawData.get()); if ( event ) { std::size_t len = 0; - for_each(begin(*event),end(*event),[&len](const auto& b) { len += b.first->totalSize(); }); + std::for_each(begin(*event),end(*event),[&len](const auto& b) { len += b.first->totalSize(); }); m_eveLen->fill(double(len), 1.0); return StatusCode::SUCCESS; } diff --git a/Online/RawBankSizes/src/ODINMonitor.cpp b/Online/RawBankSizes/src/ODINMonitor.cpp index 492f791e7db6bdc0fe1eaaf0a10647f791a6985d..3ef0fae1de95b19cf52527ea2f1850fc1b484965 100644 --- a/Online/RawBankSizes/src/ODINMonitor.cpp +++ b/Online/RawBankSizes/src/ODINMonitor.cpp @@ -23,10 +23,10 @@ #include #include +/// Forward declarations +namespace LHCb { class RawBank; } -/* - * Online namespace declaration - */ +/// Online namespace declaration namespace Online { /**@class ODINMonitor ODINMonitor.cpp @@ -37,15 +37,14 @@ namespace Online { class GAUDI_API ODINMonitor: public Gaudi::Algorithm { private: - typedef Tell1Bank BankHeader; - typedef std::vector > evt_data_t; + typedef std::vector > lb_evt_data_t; AIDA::IHistogram1D* m_bcidsHisto = 0; AIDA::IHistogram1D* m_bxtypeHisto = 0; AIDA::IHistogram1D* m_trgtypeHisto = 0; AIDA::IHistogram2D* m_bxtypeHistovsBxid = 0; AIDA::IHistogram2D* m_trgtypeHistovsBxid = 0; - DataObjectReadHandle m_rawData {this,"RawData","Banks/RawData"}; + DataObjectReadHandle m_rawData {this,"RawData","Banks/RawData"}; public: /// Standard Algorithm Constructor(s) @@ -94,7 +93,7 @@ namespace Online { /// Algorithm overload: Event execution routine StatusCode execute(const EventContext& /* context */ ) const override final { - using namespace std; + typedef std::vector > evt_data_t; if ( !m_bcidsHisto ) { error() << "Where the heck did my histogram go?" << endmsg; return StatusCode::FAILURE; @@ -115,9 +114,9 @@ namespace Online { error() << "Where the heck did my histogram go?" << endmsg; return StatusCode::FAILURE; } - const evt_data_t* event = m_rawData.get(); + const evt_data_t* event = reinterpret_cast(m_rawData.get()); if ( event ) { - for_each(begin(*event),end(*event),[this](const auto& bank) { + std::for_each(begin(*event),end(*event),[this](const auto& bank) { if ( bank.first->type() == Tell1Bank::ODIN ) { if ( bank.first->version() >= 7 ) { auto* sodin = (pcie40::sodin_t*)bank.second; diff --git a/Online/RawBankSizes/src/RawBankSizes.cpp b/Online/RawBankSizes/src/RawBankSizes.cpp index 8cca05735021b40b02352552f37fa5f642e6d68b..8a2305322d3d42645fef2b589b73d1072ed3f71f 100644 --- a/Online/RawBankSizes/src/RawBankSizes.cpp +++ b/Online/RawBankSizes/src/RawBankSizes.cpp @@ -236,7 +236,7 @@ StatusCode RawBankSizes::finalize() { // Main execution //============================================================================= StatusCode RawBankSizes::execute(const EventContext& /* context */) const { - const evt_data_t* event = m_rawData.get(); + const evt_data_t* event = reinterpret_cast(m_rawData.get()); int runno = 100; if ( !event ) { warning() << "Raw data not found at location '" << m_rawData << endmsg; @@ -294,7 +294,7 @@ StatusCode RawBankSizes::fill_histos(const evt_data_t& data, int /* runno */) } } for( const auto& dsc : data ) { - const BankHeader* b = dsc.first; + const Tell1Bank* b = dsc.first; int id = b->type(); double siz = b->size(); double src = b->sourceID(); diff --git a/build_standalone.sh b/build_standalone.sh index dd88a1bfc5c72fdb268106f553128efc899cb72a..7ec2609f7a346b584e46d6429729add706b9fb5c 100755 --- a/build_standalone.sh +++ b/build_standalone.sh @@ -36,31 +36,37 @@ do_echo() usage() { echo " do_make -opt [-opt]" - echo " -t --type Supply build type Debug0|Debug|Release. Default: Debug0"; - echo " -s --source Source directory for cmake"; + echo " -t --type Supply build type Debug0|Debug|Release. Default: Debug0"; + echo " -s --source Source directory for cmake"; echo " Default: `pwd`"; echo " CMakeLists.txt required!"; - echo " -b --build-dir Build directory for cmake"; - echo " Default: `pwd`/dataflow_build.${binary_tag}"; + echo " -b --build-dir Build directory for cmake"; + echo " Default: `pwd`/build_dataflow."; echo " -i --install-dir Installation directory for cmake"; - echo " Default: `pwd`/dataflow_install.${binary_tag}"; - echo " -v --view Path to the LCG view directory."; - echo " Setup file for LCG view required!"; - echo " -T --tag Binary tag to be used. Default: ${BINARY_TAG}"; - echo " Setup file for LCG view required!"; + echo " Default: `pwd`/install_dataflow."; + echo " -v --view Path to the LCG view directory."; + echo " -T --tag Binary tag to be used. Current: '${binary_tag}'"; + echo " ==> Must be an existing build tag of the LCG view!"; echo " -c --cmake Execute only cmake step"; echo " -B --build Only build: no cmake, no install"; echo " -I --install Only install: no cmake, no build"; - echo " -p --parallel Number of threads for parallel build (make -j )"; - echo " -e --ebonly Build event builder only"; + echo " -p --parallel Number of threads for parallel build (make -j )"; + echo " -e --ebonly Build event builder only "; + echo " changes defaults to: "; + echo " build-dir: `pwd`/build_ebonly."; + echo " install-dir: `pwd`/install_ebonly."; echo ""; echo ""; - echo " Example:"; - echo ""; + echo " ===> Example to use the view from Gaudi (/cvmfs/lhcb.cern.ch):"; echo " cd /group/online/dataflow/cmtuser/ONLINE/ONLINE_v7r19"; echo " . /cvmfs/lhcb.cern.ch/lib/lcg/releases/LCG_101/gcc/11.1.0/x86_64-centos7/setup.sh"; echo " ./build_standalone.sh -t Debug0 --tag x86_64-centos7-gcc11-dbg --view /cvmfs/lhcb.cern.ch/lib/lcg/releases/LCG_101 -c -B -I"; echo ""; + echo " ===> Example to use the view from SFT (/cvmfs/sft.cern.ch):"; + echo " cd /group/online/dataflow/cmtuser/ONLINE/ONLINE_v7r19"; + echo " . /cvmfs/sft.cern.ch/lcg/releases/gcc/11.1.0/x86_64-centos7/setup.sh"; + echo " ./build_standalone.sh -t Debug0 --tag x86_64-centos7-gcc11-dbg --view /cvmfs/sft.cern.ch/lcg/releases/LCG_101 -c -B -I"; + echo ""; exit 1; } #===============================================================================