Skip to content
Snippets Groups Projects
Commit 1fb679d4 authored by Rosen Matev's avatar Rosen Matev :sunny:
Browse files

Add filter to select events that emitted a certain message.

parent 46b12de2
No related branches found
No related tags found
4 merge requests!4553Fix UT Decoder,!4539Synchronize master branch with 2024-patches,!4525Draft: Synchronize master branch with 2024-patches,!3883Error event handling
......@@ -26,8 +26,8 @@ public:
StatusCode initialize() override;
private:
/// Pick up late-attributed data outputs
Gaudi::Property<int> m_CFD{this, "CFD", 1, "ControlFlowDecision is true every Nth events"};
Gaudi::Property<int> m_CFD{this, "CFD", 1, "ControlFlowDecision is true every Nth events"};
Gaudi::Property<bool> m_decisionWarning{this, "DecisionWarning", false, "Emit a warning for false decisions"};
Gaudi::Property<std::vector<std::string>> m_inpKeys{this, "inpKeys", {}, ""};
Gaudi::Property<std::vector<std::string>> m_outKeys{this, "outKeys", {}, ""};
......@@ -93,6 +93,8 @@ StatusCode ConfigurableDummy::execute( EventContext const& context ) const // th
outputHandle->put( std::make_unique<DataObject>() );
}
if ( m_CFD > 0 && context.evt() % m_CFD == 0 ) return Gaudi::Functional::FilterDecision::PASSED;
return Gaudi::Functional::FilterDecision::FAILED;
bool decision = m_CFD > 0 && context.evt() % m_CFD == 0;
if ( m_decisionWarning && !decision ) { warning() << "Event did not pass" << endmsg; }
return decision ? Gaudi::Functional::FilterDecision::PASSED : Gaudi::Functional::FilterDecision::FAILED;
}
......@@ -66,6 +66,7 @@ gaudi_add_module(LHCbAlgs
src/TimingTool.cpp
src/TrajPoca.cpp
src/createODIN.cpp
src/MessageSvcEventFilter.cpp
LINK
AIDA::aida
Boost::filesystem
......
/*****************************************************************************\
* (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#include "LHCbAlgs/FilterPredicate.h"
/** @class MessageSvcEventFilter
*
* Pass when the processing of the event caused a certain message.
*
* This filter can be configured to filter on ERROR/FATAL messages, for example,
* and can be inserted in the control flow before a writer.
* In this way one can create an "error stream" and save events that
* exhibit errors in the processing.
*
* The algorithm hooks into the MessageSvc by modifying the default stream (at start).
* The modified ostream matches the formatted messages it sees with a regex pattern.
* (Take care to synchronise the MessageSvc format with the pattern.)
* If the message matches, a "flag" object is put on the TES (in the slot assigned to
* the algorithm's execution that emitted the message).
* Then, when the MessageSvcEventFilter algorithm is executed, it simply checks the existence of
* the flag object on the TES and passes only if it finds one.
*
* Note on implementation. The interface msgSvc()->insertStream() seems more appropriate but we
* cannot use it. Using it would look like this
* msgSvc()->insertStream( MSG::WARNING, "default_stream", msgSvc()->defaultStream() );
* msgSvc()->insertStream( MSG::WARNING, name() + "_spy_stream", m_spyStream.get() );
* ...
* msgSvc()->eraseStream( MSG::WARNING );
* The problem is that the MessageSvc as implemented does not apply formatting in the case
* of explicitly inserted streams. It is impossible to apply it here (we only see strings).
* TODO report/fix this in Gaudi
*
*/
namespace {
using DataHandle = DataObjectWriteHandle<DataObject>;
std::ostream* s_origStream = nullptr;
class spystreambuf : public std::stringbuf {
public:
explicit spystreambuf( std::string pattern, DataHandle& dh ) : m_pattern( pattern ), m_dataHandle( dh ) {}
private:
int sync() override {
std::ptrdiff_t n = pptr() - pbase();
std::string temp;
temp.assign( pbase(), n );
*s_origStream << temp << std::flush;
pbump( -n );
return 0;
}
std::streamsize xsputn( const char_type* s, std::streamsize count ) override {
std::string temp;
temp.assign( s, count );
*s_origStream << temp;
// FIXME actually use pattern here
if ( m_dataHandle.getIfExists() == nullptr ) {
try {
m_dataHandle.put( std::make_unique<DataObject>() );
} catch ( GaudiException& e ) { std::cerr << e.message() << std::endl; };
}
return count;
}
int_type overflow( int_type ch ) override {
*s_origStream << (char)ch;
return std::stringbuf::overflow( ch );
}
// spystreambuf( const spystreambuf& );
// spystreambuf& operator=( const spystreambuf& );
std::string m_pattern;
DataHandle& m_dataHandle;
};
} // namespace
class MessageSvcEventFilter : public LHCb::Algorithm::FilterPredicate<bool()> {
public:
using FilterPredicate::FilterPredicate;
StatusCode start() override;
StatusCode stop() override;
bool operator()() const override;
private:
Gaudi::Property<std::string> m_pattern{
this,
"PatternRegex",
"",
[this]( const auto& ) { this->m_regex = this->m_pattern.value(); },
"std::regex pattern that matches messages",
};
DataHandle m_flag{this, "FlagLocation", ""};
mutable Gaudi::Accumulators::BinomialCounter<> m_counter{this, "Accepted"};
std::regex m_regex;
std::unique_ptr<spystreambuf> m_spyBuf;
std::unique_ptr<std::ostream> m_spyStream;
};
DECLARE_COMPONENT( MessageSvcEventFilter )
StatusCode MessageSvcEventFilter::start() {
return FilterPredicate::start().andThen( [&] {
if ( !s_origStream ) {
s_origStream = msgSvc()->defaultStream();
// TODO treat missing (nullptr) default stream?
m_spyStream.reset(); // in case we're called a second time, destroy the stream before the buffer
m_spyBuf = std::make_unique<spystreambuf>( "pattern", m_flag );
m_spyStream = std::make_unique<std::ostream>( m_spyBuf.get() );
msgSvc()->setDefaultStream( m_spyStream.get() );
}
} );
}
StatusCode MessageSvcEventFilter::stop() {
if ( s_origStream ) {
msgSvc()->setDefaultStream( s_origStream );
s_origStream = nullptr;
}
return FilterPredicate::stop();
}
bool MessageSvcEventFilter::operator()() const {
bool pass = m_flag.getIfExists() != nullptr;
m_counter += pass;
return pass;
}
###############################################################################
# (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
from PyConf.Algorithms import (
MessageSvcEventFilter,
ConfigurableDummy,
)
from PyConf.application import ApplicationOptions, configure, configure_input
from PyConf.control_flow import CompositeNode, NodeLogic
message_producer = ConfigurableDummy(
CFD=3, DecisionWarning=True) # 33% prescale
algs_node = CompositeNode(
"algs", [message_producer],
combine_logic=NodeLogic.LAZY_AND,
force_order=True)
event_filter = MessageSvcEventFilter()
writer_node = CompositeNode(
"writer", [event_filter],
combine_logic=NodeLogic.LAZY_AND,
force_order=True)
top = CompositeNode(
"top", [algs_node, writer_node],
combine_logic=NodeLogic.NONLAZY_AND,
force_order=True)
# Define the application environment and run it
options = ApplicationOptions(_enabled=False)
options.n_threads = 4
options.n_event_slots = 4
options.evt_max = 50
options.input_type = "NONE"
options.dddb_tag = "dummy"
options.conddb_tag = "dummy"
options.simulation = False
config2 = configure_input(options)
config = configure(options, top)
<?xml version="1.0" ?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2000-2024 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>../options/error_stream.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>per-test</enumeral></argument>
<argument name="validator">
<text>
countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 33})
# Being determinisitic, the emulator should always accept the same events and
# hence emulate the same accept fractions
findReferenceBlock("""
NONLAZY_AND: top #=50 Sum=0 Eff=|( 0.000000 +- 0.00000 )%|
LAZY_AND: algs #=50 Sum=17 Eff=|( 34.00000 +- 6.69925 )%|
ConfigurableDummy/ConfigurableDummy_b0635f1e #=50 Sum=17 Eff=|( 34.00000 +- 6.69925 )%|
LAZY_AND: writer #=50 Sum=33 Eff=|( 66.00000 +- 6.69925 )%|
MessageSvcEventFilter/MessageSvcEventFilter_56e8af36 #=50 Sum=33 Eff=|( 66.00000 +- 6.69925 )%|
""")
</text>
</argument>
</extension>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment