Skip to content
Snippets Groups Projects
Commit 27b83152 authored by Rosen Matev's avatar Rosen Matev :sunny:
Browse files

Merge branch 'fix-fsr-propagation' into 'master'

Fix detection and propagation of FSRs from input files

See merge request !4138
parents 485af3fe 5d9e2dd5
No related branches found
No related tags found
1 merge request!4138Fix detection and propagation of FSRs from input files
Pipeline #5744519 passed
......@@ -31,11 +31,18 @@ The content of FSRs fields from input files is reported to the output
FSR either by merging the data (if possible) or copied verbatim).
#]=======================================================================]
gaudi_add_header_only_library(FileSummaryRecord
LINK
Gaudi::GaudiKernel
nlohmann_json::nlohmann_json
)
gaudi_add_module(FileSummaryRecordModule
SOURCES
src/mod/FSRSink.cpp
src/mod/OpenFilesTracker.cpp
LINK
FileSummaryRecord
Boost::regex
fmt::fmt
Gaudi::GaudiKernel
......@@ -49,6 +56,7 @@ if(BUILD_TESTING)
SOURCES
tests/src/FSRTestAlgs.cpp
LINK
FileSummaryRecord
Gaudi::GaudiKernel
LHCb::DAQEventLib
LHCb::LHCbAlgsLib
......
/*****************************************************************************\
* (c) Copyright 2023 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#include <GaudiKernel/Incident.h>
#include <nlohmann/json.hpp>
namespace LHCb {
/// Simple incident to notify listeners that a new FSR has been read from an input file.
struct FileSummaryRecordIncident final : Incident {
static constexpr const char* Type = "FileSummaryRecordIncident";
FileSummaryRecordIncident( const std::string& source, const nlohmann::json& data )
: Incident{source, Type}, data{data} {}
~FileSummaryRecordIncident() = default;
const nlohmann::json& data;
};
} // namespace LHCb
/*****************************************************************************\
* (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration *
* (c) Copyright 2022-2023 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
......@@ -17,6 +17,7 @@ namespace Gaudi::Utils {
std::ostream& toStream( const boost::regex& obj, std::ostream& s ) { return s << std::quoted( obj.str() ); }
} // namespace Gaudi::Utils
#include <GAUDI_VERSION.h>
#include <Gaudi/Interfaces/IOptionsSvc.h>
#include <Gaudi/MonitoringHub.h>
#include <Gaudi/Property.h>
......@@ -24,11 +25,12 @@ namespace Gaudi::Utils {
#include <GaudiKernel/IIncidentListener.h>
#include <GaudiKernel/IIncidentSvc.h>
#include <GaudiKernel/Service.h>
#include <GaudiKernel/ServiceHandle.h>
#include <GaudiUtils/IIODataManager.h>
#include <LHCb/FileSummaryRecordIncident.h>
#include <TFile.h>
#include <algorithm>
#include <chrono>
#include <filesystem>
#include <fmt/format.h>
#include <fstream>
#include <future>
......@@ -44,6 +46,9 @@ namespace Gaudi::Utils {
#include <thread>
#include <tuple>
#include <utility>
#if GAUDI_VERSION < CALC_GAUDI_VERSION( 36, 13 )
# include <filesystem>
#endif
namespace {
std::string key_for( Gaudi::Monitoring::Hub::Entity const& ent ) {
......@@ -99,10 +104,17 @@ namespace LHCb::FSR {
return Service::initialize().andThen( [&] {
// declare ourself as a monitoring sink
serviceLocator()->monitoringHub().addSink( this );
// declare ourself as a incident linetener (to know when files are opened/closed)
if ( auto incSvc = service<IIncidentSvc>( "IncidentSvc" ) ) {
incSvc->addListener( this, "CONNECTED_OUTPUT" );
incSvc->addListener( this, IncidentType::BeginInputFile );
// declare ourself as a incident listener (to know when files are opened/closed)
if ( m_incSvc ) {
m_incSvc->addListener( this, "CONNECTED_OUTPUT" );
#if GAUDI_VERSION < CALC_GAUDI_VERSION( 36, 13 )
m_incSvc->addListener( this, IncidentType::BeginInputFile );
#else
m_incSvc->addListener( this, "CONNECTED_INPUT" );
#endif
} else {
warning() << "cannot access IIncidentSvc " << m_incSvc.name() << ": no access to FSRs from input files"
<< endmsg;
}
// add some special data to the FSR
if ( m_includeJobOptions ) {
......@@ -137,6 +149,7 @@ namespace LHCb::FSR {
}
void handle( const Incident& inc ) override {
#if GAUDI_VERSION < CALC_GAUDI_VERSION( 36, 13 )
if ( inc.type() == IncidentType::BeginInputFile ) {
namespace fs = std::filesystem;
// BeginInputFile is actually fired before really opening the file, so the only way
......@@ -163,25 +176,31 @@ namespace LHCb::FSR {
}
}
nlohmann::json incoming_fsr;
if ( is_root_file ) {
std::unique_ptr<TFile> f( TFile::Open( inc.source().c_str() ) );
if ( f ) {
std::string* s{nullptr};
f->GetObject( m_recordName.value().c_str(), s );
if ( s ) {
incoming_fsr = nlohmann::json::parse( *s );
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"error", m_recordName.value() + " not found"}};
m_inputsFSRs.emplace_back( nlohmann::json::parse( *s ) );
if ( m_incSvc ) m_incSvc->fireIncident( FileSummaryRecordIncident( name(), m_inputsFSRs.back() ) );
}
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"error", "could not open file"}};
}
} else {
incoming_fsr = nlohmann::json{{"name", inc.source()}, {"info", "not a root file"}};
}
m_inputsFSRs.emplace_back( std::move( incoming_fsr ) );
} else if ( inc.type() == "CONNECTED_OUTPUT" ) {
} else
#else
if ( inc.type() == "CONNECTED_INPUT" ) {
if ( auto ci = dynamic_cast<const ContextIncident<TFile*>*>( &inc ) ) {
std::string* s{nullptr};
ci->tag()->GetObject( m_recordName.value().c_str(), s );
if ( s ) {
m_inputsFSRs.emplace_back( nlohmann::json::parse( *s ) );
if ( m_incSvc ) m_incSvc->fireIncident( FileSummaryRecordIncident( name(), m_inputsFSRs.back() ) );
}
}
} else
#endif
if ( inc.type() == "CONNECTED_OUTPUT" ) {
if ( auto ci = dynamic_cast<const ContextIncident<TFile*>*>( &inc ) ) {
std::string guid;
if ( auto datamgr = service<Gaudi::IIODataManager>( "IODataManager" ) ) {
......@@ -201,7 +220,10 @@ namespace LHCb::FSR {
}
StatusCode finalize() override {
if ( auto incSvc = service<IIncidentSvc>( "IncidentSvc" ) ) { incSvc->removeListener( this ); }
if ( m_incSvc ) {
m_incSvc->removeListener( this );
m_incSvc.release().ignore();
}
m_flushThreadStop.set_value(); // tell the flush thread we are stopping
if ( m_flushThread.joinable() ) m_flushThread.join(); // and wait that it exits
......@@ -265,6 +287,8 @@ namespace LHCb::FSR {
std::vector<OutputFile> m_outputFiles;
std::list<nlohmann::json> m_inputsFSRs;
ServiceHandle<IIncidentSvc> m_incSvc{this, "IncidentSvc", "IncidentSvc"};
std::thread m_flushThread;
std::promise<void> m_flushThreadStop;
};
......
......@@ -113,12 +113,6 @@ def check(causes, result):
"writes": 5
}
},
"inputs": [{
"info":
"not a root file",
"name":
"mdf:root://eoslhcb.cern.ch//eos/lhcb/cern-swtest/lhcb/data/2018/RAW/FULL/LHCb/COLLISION18/209291/209291_0000000745.raw"
}],
}
try:
......
......@@ -12,7 +12,9 @@ import os
import json
from traceback import format_exc
from unittest import TestCase
import itertools
from pprint import pformat
from .write import GUID
FILENAMEFSR = f"{__name__}.fsr.json"
FILENAME = f"{__name__}.root"
......@@ -31,10 +33,14 @@ def config():
from PyConf.control_flow import CompositeNode
from PyConf.components import setup_component
from PyConf.Algorithms import LHCb__Tests__EventCountAlg, Gaudi__Examples__IntDataProducer, ReadTES
from Configurables import ApplicationMgr
from Configurables import ApplicationMgr, Gaudi__MultiFileCatalog
Gaudi__MultiFileCatalog("FileCatalog").Catalogs = [
"xmlcatalog_file:FSRTests.catalog.xml"
]
options = ApplicationOptions(_enabled=False)
options.input_files = ["FSRTests.write.root"]
options.input_files = ["LFN:FSRTests-write"]
options.input_type = 'ROOT'
options.output_file = FILENAME
options.output_type = 'ROOT'
......@@ -59,6 +65,12 @@ def config():
AcceptRegex=r"^(Evt|Other)Counter\.count$",
OutputFile=FILENAMEFSR,
)))
app.ExtSvc.append(
config.add(
setup_component(
"LHCb__Tests__InputFSRSpy",
instance_name="InputFSRSpy",
)))
producer = Gaudi__Examples__IntDataProducer()
......@@ -84,7 +96,7 @@ def config():
os.remove(name)
def check(causes, result):
def check(causes, result, stdout):
result["root_output_file"] = FILENAME
missing_files = [
......@@ -134,7 +146,8 @@ def check(causes, result):
expected["guid"] = guid # GUID is random
# let's get the FSR of the input file from the output of the upstream test
expected["inputs"].append(json.load(open("FSRTests.write.fsr.json")))
input_fsr = json.load(open("FSRTests.write.fsr.json"))
expected["inputs"].append(input_fsr)
tester = TestCase()
checking = "JSON dump"
......@@ -142,6 +155,20 @@ def check(causes, result):
checking = "ROOT file"
tester.assertEqual(expected, fsr_root)
checking = "input incident"
assert "got FSR from input file" in stdout, "missing message from InputFSRSpy"
lines = list(
itertools.islice(
itertools.dropwhile(
lambda l: "got FSR from input file" not in l,
stdout.splitlines()), 1, 4))
assert len(lines) == 3, \
"invalid output from InputFSRSpy (wrong length)"
assert (lines[0] == "```" and lines[2] == "```"), \
"invalid output from InputFSRSpy (missing ``` markers)"
tester.assertEqual(input_fsr, json.loads(lines[1]))
except AssertionError as err:
causes.append("FSR content")
result[f"FSR problem ({checking})"] = result.Quote(str(err))
......
......@@ -17,6 +17,7 @@ from pprint import pformat
FILENAMEFSR = f"{__name__}.fsr.json"
FILENAME = f"{__name__}.root"
FILENAMEJSON = f"{__name__}.json"
GUID = "D80398A9-70BB-4EEE-9AB8-0BEC4D976C13"
def checkDiff(a, b):
......@@ -31,13 +32,31 @@ def config():
from PyConf.control_flow import CompositeNode
from PyConf.components import setup_component
from PyConf.Algorithms import LHCb__Tests__EventCountAlg, Gaudi__Examples__IntDataProducer
from Configurables import ApplicationMgr
from Configurables import ApplicationMgr, Gaudi__MultiFileCatalog
# use a specific XML catalog
Gaudi__MultiFileCatalog("FileCatalog").Catalogs = [
"xmlcatalog_file:FSRTests.catalog.xml"
]
with open("FSRTests.catalog.xml", "w") as f:
f.write(f"""<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">
<POOLFILECATALOG>
<File ID="{GUID}">
<physical>
<pfn filetype="ROOT" name="{FILENAME}"/>
</physical>
<logical>
<lfn name="FSRTests-write"/>
</logical>
</File>
</POOLFILECATALOG>""")
options = ApplicationOptions(_enabled=False)
# No data from the input is used, but something should be there for the configuration
options.input_files = ["dummy_input_file_name.dst"]
options.input_type = 'ROOT'
options.output_file = FILENAME
options.output_file = "LFN:FSRTests-write"
options.output_type = 'ROOT'
options.data_type = 'Upgrade'
options.dddb_tag = 'upgrade/dddb-20220705'
......@@ -95,7 +114,8 @@ def check(causes, result):
"empty": False,
"nEntries": 5,
"type": "counter:Counter:m"
}
},
"guid": GUID
}
try:
......@@ -115,11 +135,6 @@ def check(causes, result):
checking = "no check yet"
try:
guid = fsr_dump.get("guid")
assert guid, "missing or invalid GUID in FSR dump"
expected["guid"] = guid # GUID is random
tester = TestCase()
checking = "JSON dump"
tester.assertEqual(expected, fsr_dump)
......
......@@ -15,7 +15,7 @@
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
from FSRTests.read import check
check(causes, result)
check(causes, result, stdout)
</text></argument>
<argument name="prerequisites"><set>
<tuple><text>FileSummaryRecord.write</text><enumeral>PASS</enumeral></tuple>
......
......@@ -11,6 +11,10 @@
#include <Event/LumiEventCounter.h>
#include <Event/ODIN.h>
#include <Gaudi/Accumulators.h>
#include <GaudiKernel/IIncidentListener.h>
#include <GaudiKernel/IIncidentSvc.h>
#include <GaudiKernel/Service.h>
#include <LHCb/FileSummaryRecordIncident.h>
#include <LHCbAlgs/Consumer.h>
namespace LHCb ::Tests {
......@@ -32,4 +36,25 @@ namespace LHCb ::Tests {
mutable LHCb::LumiEventCounter m_lumiCount{this, "eventsByRun"};
};
DECLARE_COMPONENT( RunEventCountAlg )
struct InputFSRSpy final : extends<Service, IIncidentListener> {
using extends::extends;
StatusCode initialize() override {
return extends::initialize().andThen( [&] { return m_incSvc.retrieve(); } ).andThen( [&] {
m_incSvc->addListener( this, FileSummaryRecordIncident::Type );
} );
}
StatusCode finalize() override {
m_incSvc->removeListener( this );
m_incSvc.release().ignore();
return extends::finalize();
}
void handle( const Incident& inc ) override {
if ( auto fsrInc = dynamic_cast<const FileSummaryRecordIncident*>( &inc ) ) {
info() << "got FSR from input file:\n```\n" << fsrInc->data << "\n```" << endmsg;
}
}
ServiceHandle<IIncidentSvc> m_incSvc{this, "IncidentSvc", "IncidentSvc"};
};
DECLARE_COMPONENT( InputFSRSpy )
} // namespace LHCb::Tests
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment