Skip to content
Snippets Groups Projects

Run3 File Summary Record

Merged Marco Clemencic requested to merge fsr-for-run3 into master
Compare and
20 files
+ 1739
0
Compare changes
  • Side-by-side
  • Inline
Files
20
+ 269
0
/*****************************************************************************\
* (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\*****************************************************************************/
// This preamble is needed to make `genconf` pick up the right implementation of `toStream`
// See https://gitlab.cern.ch/gaudi/Gaudi/-/issues/241
#include <boost/regex.hpp>
#include <iomanip>
#include <ostream>
namespace Gaudi::Utils {
std::ostream& toStream( const boost::regex& obj, std::ostream& s ) { return s << std::quoted( obj.str() ); }
} // namespace Gaudi::Utils
#include <Gaudi/Interfaces/IOptionsSvc.h>
#include <Gaudi/MonitoringHub.h>
#include <Gaudi/Property.h>
#include <GaudiKernel/DataIncident.h>
#include <GaudiKernel/IIncidentListener.h>
#include <GaudiKernel/IIncidentSvc.h>
#include <GaudiKernel/Service.h>
#include <GaudiUtils/IIODataManager.h>
#include <TFile.h>
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <filesystem>
#include <fmt/format.h>
#include <fstream>
#include <iterator>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <nlohmann/json.hpp>
#include <string>
#include <string_view>
#include <system_error>
#include <thread>
#include <tuple>
#include <utility>
namespace {
std::string key_for( Gaudi::Monitoring::Hub::Entity const& ent ) {
return ent.component.empty() ? ent.name : fmt::format( "{}.{}", ent.component, ent.name );
}
} // namespace
namespace Gaudi::Parsers {
StatusCode parse( boost::regex& result, const std::string& input ) {
std::string tmp;
return parse( tmp, input ).andThen( [&result, &tmp] { result = tmp; } );
}
} // namespace Gaudi::Parsers
namespace {
// Note: this might be a lambda, but somehow I'm getting a spurious compiler (gcc-11.1 -Og) warning if I do so,
// see https://godbolt.org/z/3vGMecsYj
nlohmann::json opts2json( const void* ptr ) {
const auto items = reinterpret_cast<const Gaudi::Interfaces::IOptionsSvc*>( ptr )->items();
std::map<std::string_view, std::string_view> m;
std::transform( items.begin(), items.end(), std::inserter( m, m.end() ),
[]( const auto& i ) -> std::pair<std::string_view, std::string_view> {
return {std::get<0>( i ), std::get<1>( i )};
} );
return {{"type", "JobOptions"}, {"options", m}};
}
} // namespace
namespace Gaudi::Monitoring {
// Specialization of Hub::Entity constructor to be able to "monitor" the job options service
template <>
Hub::Entity::Entity( std::string component, std::string name, std::string type, Interfaces::IOptionsSvc& ent )
: component{std::move( component )}
, name{std::move( name )}
, type{std::move( type )}
, m_ptr{&ent}
, m_typeIndex{[]( const void* ptr ) {
return std::type_index( typeid( *reinterpret_cast<const Gaudi::Interfaces::IOptionsSvc*>( ptr ) ) );
}}
, m_reset{[]( void* ) {}}
, m_mergeAndReset{[]( void*, void* ) {}}
, m_getJSON{opts2json}
, m_mergeAndResetFromJSON{nullptr} {}
} // namespace Gaudi::Monitoring
namespace LHCb::FSR {
struct Sink : extends<Service, IIncidentListener>, Gaudi::Monitoring::Hub::Sink {
using Entity = Gaudi::Monitoring::Hub::Entity;
Sink( std::string name, ISvcLocator* svcloc ) : extends( std::move( name ), svcloc ) {}
StatusCode initialize() override {
return Service::initialize().andThen( [&] {
// declare ourself as a monitoring sink
serviceLocator()->monitoringHub().addSink( this );
// declare ourself as a incident linetener (to know when files are opened/closed)
if ( auto incSvc = service<IIncidentSvc>( "IncidentSvc" ) ) {
incSvc->addListener( this, "CONNECTED_OUTPUT" );
incSvc->addListener( this, IncidentType::BeginInputFile );
}
// add some special data to the FSR
if ( m_includeJobOptions ) {
m_entities.emplace( "jobOptions", Entity{"", "", "JobOptions", serviceLocator()->getOptsSvc()} );
}
// warn about inconsistent settings if needed
if ( m_autoFlushPeriod.value() != 0. && m_outputFile.empty() ) {
warning() << "non-zero AutoFlushPeriod option makes sense only in conjunction with OutputFile" << endmsg;
}
// if there is no explicit RecordName, use the instance name
if ( m_recordName.empty() ) { m_recordName = name(); }
} );
}
StatusCode start() override {
return Service::start().andThen( [&] {
// enable periodic output file flush if requested
if ( m_autoFlushPeriod.value() != 0. && !m_outputFile.empty() ) {
m_flushThread = std::thread{[this]() {
using namespace std::chrono_literals;
std::unique_lock lk( m_flushCondMutex );
while ( m_flushCond.wait_for( lk, m_autoFlushPeriod.value() * 1s ) == std::cv_status::timeout ) {
writeOutputFile( collect() );
}
}};
}
} );
}
void handle( const Incident& inc ) override {
if ( inc.type() == IncidentType::BeginInputFile ) {
namespace fs = std::filesystem;
// BeginInputFile is actually fired before really opening the file, so the only way
// to access the content is to temporarily open it.
// First we have to check if it makes sense to try to open it
bool is_root_file = false; // let's prepare for the worse
// - is it a local file?
std::string_view filename( inc.source() );
if ( filename.substr( 0, 5 ) == "file:" ) filename.remove_prefix( 5 );
fs::path path( filename );
std::error_code ec;
if ( fs::exists( filename, ec ) ) {
char buff[5] = {0};
std::ifstream( path ).read( buff, 4 );
is_root_file = std::string_view( buff ) == "root";
} else {
// it's definitely not a local file, so let's try a guess based on the filename
// (as of today I know only that .mdf or .raw files are *not* ROOT files and
// if the protocol prefix is mdf: also it's not a ROOT file)
if ( filename.substr( 0, 4 ) != "mdf:" ) {
auto ext = filename.substr( filename.find_last_of( '.' ) );
is_root_file = !( ext == ".mdf" || ext == ".raw" );
}
}
nlohmann::json incoming_fsr;
if ( is_root_file ) {
std::unique_ptr<TFile> f( TFile::Open( inc.source().c_str() ) );
if ( f ) {
std::string* s{nullptr};
f->GetObject( m_recordName.value().c_str(), s );
if ( s ) {
incoming_fsr = nlohmann::json::parse( *s );
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"error", m_recordName.value() + " not found"}};
}
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"error", "could not open file"}};
}
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"info", "not a root file"}};
}
m_inputsFSRs.emplace_back( std::move( incoming_fsr ) );
} else if ( inc.type() == "CONNECTED_OUTPUT" ) {
if ( auto ci = dynamic_cast<const ContextIncident<TFile*>*>( &inc ) ) {
std::string guid;
if ( auto datamgr = service<Gaudi::IIODataManager>( "IODataManager" ) ) {
if ( auto conn = datamgr->connection( ci->source() ) ) guid = conn->fid();
}
m_outputFiles.push_back( {ci->source(), ci->tag(), guid} );
}
}
}
void registerEntity( Entity ent ) override {
auto key = key_for( ent );
if ( boost::regex_match( key, m_acceptRegex.value() ) ) { m_entities.emplace( key, std::move( ent ) ); }
}
void removeEntity( Entity const& ent ) override {
if ( auto it = m_entities.find( key_for( ent ) ); it != m_entities.end() ) { m_entities.erase( it ); }
}
StatusCode finalize() override {
if ( auto incSvc = service<IIncidentSvc>( "IncidentSvc" ) ) { incSvc->removeListener( this ); }
m_flushCond.notify_all(); // tell the flush thread we are stopping
if ( m_flushThread.joinable() ) m_flushThread.join(); // and wait that it exits
nlohmann::json fsr = collect();
writeOutputFile( fsr );
for ( const auto& outputFile : m_outputFiles ) {
fsr["guid"] = outputFile.guid;
auto s = fsr.dump();
outputFile.file_ptr->WriteObject( &s, m_recordName.value().c_str() );
}
return extends::finalize();
}
nlohmann::json collect() const {
nlohmann::json fsr = nlohmann::json::object();
std::for_each( begin( m_entities ), end( m_entities ),
[&fsr]( const auto& entry ) { fsr[entry.first] = entry.second.toJSON(); } );
if ( !m_inputsFSRs.empty() ) fsr["inputs"] = m_inputsFSRs;
return fsr;
}
void writeOutputFile( const nlohmann::json& data ) const {
if ( !m_outputFile.empty() ) {
nlohmann::json out = data;
if ( m_outputFiles.size() == 1 ) {
out["guid"] = m_outputFiles.front().guid;
} else
for ( const auto& outputFile : m_outputFiles ) {
out["output_files"].push_back( {{"name", outputFile.name}, {"guid", outputFile.guid}} );
}
std::ofstream o{m_outputFile.value()};
o << std::setw( 4 ) << out << '\n';
}
}
Gaudi::Property<std::string> m_recordName{
this, "RecordName", "",
"name for the file summary record object in the ROOT file (by default use the name of the service)"};
Gaudi::Property<boost::regex> m_acceptRegex{
this, "AcceptRegex", "^$",
"regular expression to match for the entities to be included in the file summary record"};
Gaudi::Property<std::string> m_outputFile{this, "OutputFile", "",
"if set, write the JSON FSR data to the give file"};
Gaudi::Property<float> m_autoFlushPeriod{
this, "AutoFlushPeriod", 0.,
"if different from 0, indicates every how many seconds to force a write of the FSR data to OutputFile (this "
"parameter makes sense only if used in conjunction with OutputFile)"};
Gaudi::Property<bool> m_includeJobOptions{this, "IncludeJobOptions", false,
"if set to true, job options are added to the file summary record"};
struct OutputFile {
std::string name;
TFile* file_ptr{nullptr};
std::string guid;
};
std::map<std::string, Entity> m_entities;
std::vector<OutputFile> m_outputFiles;
std::list<nlohmann::json> m_inputsFSRs;
std::thread m_flushThread;
std::condition_variable m_flushCond;
std::mutex m_flushCondMutex;
};
DECLARE_COMPONENT( Sink )
} // namespace LHCb::FSR
Loading