Skip to content
Snippets Groups Projects
Commit 56082ebe authored by Carl Gwilliam's avatar Carl Gwilliam
Browse files

Merge branch 'derivation_code' into 'master'

Add first version of derivation (i.e. streaming code)

See merge request faser/calypso!279
parents 4ef045f6 315cbb0b
No related branches found
No related tags found
No related merge requests found
Showing
with 722 additions and 0 deletions
################################################################################
# Package: DerivationAlgs
################################################################################
# Declare the package name:
atlas_subdir( DerivationAlgs )
atlas_add_component( DerivationAlgs
src/*.cxx src/*.h
src/components/*.cxx
LINK_LIBRARIES ${ROOT_LIBRARIES} AthenaBaseComps GaudiKernel StoreGateLib DerivationToolsLib)
atlas_install_python_modules( python/*.py )
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
from AthenaConfiguration.ComponentFactory import CompFactory
from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg
def DerivationAlgCfg(flags, stream, tools, **kwargs):
acc = ComponentAccumulator()
kwargs.setdefault("Tools", tools)
##print ("SETTING", kwargs["Tools"], type(kwargs["Tools"][0]))
acc.addEventAlgo(CompFactory.Derive(stream + "_DerivationAlg", **kwargs))
return acc
def DerivationOutputCfg(flags, stream, accept, items = [], extra_items = [], exclude_items = [], **kwargs):
acc = ComponentAccumulator()
if not items:
items = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
, "FaserSiHitCollection#*" # Strip hits, do we want this?
, "FaserSCT_RDO_Container#*"
, "xAOD::WaveformHitContainer#*"
, "xAOD::WaveformHitAuxContainer#*"
, "xAOD::WaveformClock#*"
, "xAOD::WaveformClockAuxInfo#*"
]
if not items and extra_items:
items.append(extra_items)
# from PrimaryDPDMaker/python/Primary_DPD_OutputDefinitions.py in athena
# Once can use TakeItemsFromInput = True probably need to use
# acc.getEventAlgo(f"OutputStream{stream}").RemoveItem(exclude_list)
exclude_list = []
if exclude_items:
for ex in exclude_items:
if ex in items:
exclude_list.append(ex)
if ex.endswith("*"):
for it in items:
if it.startswith(ex.rstrip("*")):
exclude_list.append(it)
# items = list(set(items) - set(exclude_list))
#flags.unlock()
#flags.addFlag(f"Output.AOD{stream}FileName", f"my.{stream}.xAOD.root")
#flags.lock()
acc.merge(OutputStreamCfg(flags, stream, items))
acc.getEventAlgo(f"OutputStream{stream}").AcceptAlgs = [accept]
## if not items:
##acc.getEventAlgo(f"OutputStream{stream}").TakeItemsFromInput = True # crashes
## if extra_items:
##cc.getEventAlgo(f"OutputStream{stream}").RemoveItem(extra_items)
## if exclude_list:
##acc.getEventAlgo(f"OutputStream{stream}").AddItem(exclude_list)
return acc
def FullyConfiguredStream(flags, stream, tools, items = [], extra_items = [], **kwargs):
# TODO:
# - get items from input + why crash
acc = ComponentAccumulator()
acc.merge(DerivationAlgCfg(flags, stream, tools, **kwargs))
acc.merge(DerivationOutputCfg(flags, stream, stream + "_DerivationAlg", items, extra_items))
return acc
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
from AthenaConfiguration.ComponentFactory import CompFactory
from MagFieldServices.MagFieldServicesConfig import MagneticFieldSvcCfg
def DerivationAlgCfg(flags, name, frac, **kwargs):
acc = ComponentAccumulator()
tool = CompFactory.ExampleDerivationTool(name + "_TestTool", SaveFraction = frac)
print ("ZEBRA", tool.SaveFraction)
kwargs.setdefault("Tools", [tool])
acc.addEventAlgo(CompFactory.Derive(name, **kwargs))
return acc
def DerivationAlgCfg2(flags, name, **kwargs):
acc = ComponentAccumulator()
tool = CompFactory.TriggerStreamTool(name + "_TriggerSteamTool")
kwargs.setdefault("Tools", [tool])
acc.addEventAlgo(CompFactory.Derive(name, **kwargs))
return acc
if __name__ == "__main__":
import sys
from AthenaCommon.Logging import log, logging
from AthenaCommon.Constants import DEBUG, VERBOSE, INFO
from AthenaCommon.Configurable import Configurable
from CalypsoConfiguration.AllConfigFlags import ConfigFlags
from AthenaConfiguration.TestDefaults import defaultTestFiles
from CalypsoConfiguration.MainServicesConfig import MainServicesCfg
from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
# Set up logging and new style config
log.setLevel(DEBUG)
Configurable.configurableRun3Behavior = True
ConfigFlags.Input.Files = [
"/eos/experiment/faser/rec/2022/p0008/007984/Faser-Physics-007984-00000-p0008-xAOD.root"
#"/bundle/data/FASER/Ti12data/filter/r0008/007983/Faser-Physics-007983-TrigMask08-r0008-xAOD.root"
]
ConfigFlags.IOVDb.GlobalTag = "OFLCOND-FASER-02" # Always needed; must match FaserVersionS
ConfigFlags.IOVDb.DatabaseInstance = "OFLP200" # Use MC conditions for now
ConfigFlags.Input.ProjectName = "data21" # Needed to bypass autoconfig
ConfigFlags.Input.isMC = False # Needed to bypass autoconfig
ConfigFlags.GeoModel.FaserVersion = "FASERNU-03" # FASER geometry
ConfigFlags.Common.isOnline = False
ConfigFlags.GeoModel.Align.Dynamic = False
ConfigFlags.Beam.NumberOfCollisions = 0.
ConfigFlags.Detector.GeometryFaserSCT = True
ConfigFlags.addFlag("Output.AODSTREAM1FileName", "my.STREAM1.xAOD.root")
ConfigFlags.addFlag("Output.AODSTREAM2FileName", "my.STREAM2.xAOD.root")
ConfigFlags.addFlag("Output.AODSTREAM3FileName", "my.STREAM3.xAOD.root")
#ConfigFlags.Output.STREAM1FileName = fileName
ConfigFlags.lock()
# Core components
cfg = MainServicesCfg(ConfigFlags)
cfg.merge(PoolReadCfg(ConfigFlags))
cfg.merge(PoolWriteCfg(ConfigFlags))
# Derivation alg
cfg.merge(DerivationAlgCfg(ConfigFlags, "DerivationAlg1", 10))
cfg.merge(DerivationAlgCfg(ConfigFlags, "DerivationAlg2", 90))
cfg.merge(DerivationAlgCfg2(ConfigFlags, "DerivationAlg3"))
# Writing
from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg
streamName1 = "AODSTREAM1" # Needs to have AOD in name
itemList1 = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
, "FaserSiHitCollection#*" # Strip hits, do we want this?
, "FaserSCT_RDO_Container#*"
, "xAOD::WaveformHitContainer#*"
, "xAOD::WaveformHitAuxContainer#*"
, "xAOD::WaveformClock#*"
, "xAOD::WaveformClockAuxInfo#*"
# , "FaserSCT_SpacePointContainer#*" # Crashes
# , "Tracker::FaserSCT_ClusterContainer#*"
# , "TrackCollection#*"
]
cfg.merge(OutputStreamCfg(ConfigFlags, streamName1, itemList1)) #, disableEventTag = True))
cfg.getEventAlgo("OutputStreamAODSTREAM1").AcceptAlgs = ["DerivationAlg1"]
#cfg.getEventAlgo("OutputStreamAODSTREAM1").TakeItemsFromInput = True
# Writing
from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg
streamName2 = "AODSTREAM2" # Needs to have AOD in name
itemList2 = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
]
cfg.merge(OutputStreamCfg(ConfigFlags, streamName2, itemList2)) #, disableEventTag = True))
cfg.getEventAlgo("OutputStreamAODSTREAM2").AcceptAlgs = ["DerivationAlg2"]
# Writing
from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg
streamName3 = "AODSTREAM3" # Needs to have AOD in name
itemList3 = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
]
cfg.merge(OutputStreamCfg(ConfigFlags, streamName3, itemList3)) #, disableEventTag = True))
cfg.getEventAlgo("OutputStreamAODSTREAM3").AcceptAlgs = ["DerivationAlg3"]
# from OutputStreamAthenaPool.MultipleStreamManager import MSMgr
# streamName = "STREAM1"
# fileName = "streaming.STREAM1.root"
# testStream = MSMgr.NewPoolRootStream(streamName, fileName)
# testStream.AcceptAlgs(["DerivationAlg1"])
# cfg.addEventAlgo(testStream)
# # Hack to avoid problem with our use of MC databases when isMC = False
# replicaSvc = acc.getService("DBReplicaSvc")
# replicaSvc.COOLSQLiteVetoPattern = ""
# replicaSvc.UseCOOLSQLite = True
# replicaSvc.UseCOOLFrontier = False
# replicaSvc.UseGeomSQLite = True
# Execute and finish
cfg.printConfig()
sc = cfg.run(maxEvents=1000)
# Success should be 0
sys.exit(not sc.isSuccess())
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
from AthenaConfiguration.ComponentFactory import CompFactory
from MagFieldServices.MagFieldServicesConfig import MagneticFieldSvcCfg
if __name__ == "__main__":
import sys
from AthenaCommon.Logging import log, logging
from AthenaCommon.Constants import DEBUG, VERBOSE, INFO
from AthenaCommon.Configurable import Configurable
from CalypsoConfiguration.AllConfigFlags import ConfigFlags
from AthenaConfiguration.TestDefaults import defaultTestFiles
from CalypsoConfiguration.MainServicesConfig import MainServicesCfg
from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
# Set up logging and new style config
log.setLevel(DEBUG)
Configurable.configurableRun3Behavior = True
ConfigFlags.Input.Files = [
"/eos/experiment/faser/rec/2022/p0008/007984/Faser-Physics-007984-00000-p0008-xAOD.root"
]
ConfigFlags.IOVDb.GlobalTag = "OFLCOND-FASER-02" # Always needed; must match FaserVersionS
ConfigFlags.IOVDb.DatabaseInstance = "OFLP200" # Use MC conditions for now
ConfigFlags.Input.ProjectName = "data21" # Needed to bypass autoconfig
ConfigFlags.Input.isMC = False # Needed to bypass autoconfig
ConfigFlags.GeoModel.FaserVersion = "FASERNU-03" # FASER geometry
ConfigFlags.Common.isOnline = False
ConfigFlags.GeoModel.Align.Dynamic = False
ConfigFlags.Beam.NumberOfCollisions = 0.
ConfigFlags.Detector.GeometryFaserSCT = True
for stream in ["STREAM1", "STREAM2", "STREAM3"]:
ConfigFlags.addFlag(f"Output.AOD{stream}FileName", f"my.{stream}.xAOD.root")
ConfigFlags.lock()
# Core components
cfg = MainServicesCfg(ConfigFlags)
cfg.merge(PoolReadCfg(ConfigFlags))
cfg.merge(PoolWriteCfg(ConfigFlags))
# Derivations
from DerivationAlgs.DerivationAlgsConfig import FullyConfiguredStream
name = "STREAM1"
cfg.merge(FullyConfiguredStream(ConfigFlags, stream = name,
tools = [CompFactory.ExampleDerivationTool(name + "_TestTool", SaveFraction = 10.)])
)
name = "STREAM2"
cfg.merge(FullyConfiguredStream(ConfigFlags, stream = name,
tools = [ CompFactory.ExampleDerivationTool(name + "_TestTool", SaveFraction = 90.)],
items = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
])
)
name = "STREAM3"
cfg.merge(FullyConfiguredStream(ConfigFlags, stream = name,
tools = [CompFactory.TriggerStreamTool(name + "_TriggerTool")],
items = [ "xAOD::EventInfo#*"
, "xAOD::EventAuxInfo#*"
, "xAOD::FaserTriggerData#*"
, "xAOD::FaserTriggerDataAux#*"
])
)
# # Hack to avoid problem with our use of MC databases when isMC = False
# replicaSvc = acc.getService("DBReplicaSvc")
# replicaSvc.COOLSQLiteVetoPattern = ""
# replicaSvc.UseCOOLSQLite = True
# replicaSvc.UseCOOLFrontier = False
# replicaSvc.UseGeomSQLite = True
# Execute and finish
cfg.printConfig(withDetails = True, summariseProps = True, printDefaults = True)
sc = cfg.run(maxEvents=1000)
# Success should be 0
sys.exit(not sc.isSuccess())
#include "Derive.h"
Derive::Derive(const std::string& name,
ISvcLocator* pSvcLocator)
: AthFilterAlgorithm(name, pSvcLocator) {
//declareProperty("Tools", m_tools);
}
StatusCode
Derive::initialize() {
ATH_MSG_INFO(name() << "::initalize()" );
ATH_CHECK( m_tools.retrieve() );
return StatusCode::SUCCESS;
}
StatusCode
Derive::finalize() {
ATH_MSG_INFO(name() << "::finalize()");
ATH_MSG_INFO("Derivation" << name() << " accepted " << m_passed << " out of " << m_events << " events");
return StatusCode::SUCCESS;
}
StatusCode
Derive::execute() {
ATH_MSG_DEBUG("Executing ... ");
m_events++;
bool acceptEvent(true);
for (auto& tool : m_tools) {
// Skimming - remove events
if (!tool->passed()) acceptEvent = false;
// Thinning - remove info from event
ATH_CHECK(tool->removeBranch());
// Augmenting - add info to an event
ATH_CHECK(tool->addBranch());
}
setFilterPassed(acceptEvent);
if (acceptEvent) m_passed++;
return StatusCode::SUCCESS;
}
#ifndef DERIVATIONALGS_DERIVE_H
#define DERIVATIONALGS_DERIVE_H
// Base class
#include "AthenaBaseComps/AthFilterAlgorithm.h"
// FASER
#include "DerivationTools/IDerivationTool.h"
// Gaudi
#include "GaudiKernel/ServiceHandle.h"
#include "GaudiKernel/ToolHandle.h"
#include "GaudiKernel/IAlgTool.h"
// Athena
#include "xAODEventInfo/EventInfo.h"
#include "StoreGate/ReadHandleKey.h"
// ROOT
#include <TRandom3.h>
// STL
class Derive : public AthFilterAlgorithm {
public:
// Constructor
Derive(const std::string& name, ISvcLocator* pSvcLocator);
virtual ~Derive() = default;
/** @name Usual algorithm methods */
//@{
virtual StatusCode initialize() override;
virtual StatusCode execute() override;
virtual StatusCode finalize() override;
//@}
private:
/** @name Disallow default instantiation, copy, assignment */
//@{
Derive() = delete;
Derive(const Derive&) = delete;
Derive &operator=(const Derive&) = delete;
//@}
///
/** @name Steerable tools */
//@
ToolHandleArray<IDerivationTool> m_tools {this, "Tools", {}, "List of tools"};
//@}
/** Number of events processed */
int m_events {0};
/** Number of events selected */
int m_passed {0};
};
#endif // DERIVATIONALGS_DERIVE_H
#include "../Derive.h"
DECLARE_COMPONENT( Derive )
################################################################################
# Package: DerivationTools
################################################################################
# Declare the package name:
atlas_subdir( DerivationTools )
# External dependencies:
find_package( ROOT )
# Component(s) in the package:
atlas_add_library( DerivationToolsLib
DerivationTools/*.h src/*.cxx src/*.h
PUBLIC_HEADERS DerivationTools
PRIVATE_INCLUDE_DIRS ${ROOT_INCLUDE_DIRS}
LINK_LIBRARIES AthenaBaseComps AthenaKernel xAODFaserTrigger
PRIVATE_LINK_LIBRARIES ${ROOT_LIBRARIES}
)
atlas_add_component( DerivationTools
src/components/*.cxx
INCLUDE_DIRS ${ROOT_INCLUDE_DIRS}
LINK_LIBRARIES ${ROOT_LIBRARIES} AthenaBaseComps GaudiKernel xAODEventInfo xAODFaserTrigger DerivationToolsLib)
/*
Copyright (C) 2021 CERN for the benefit of the FASER collaboration
*/
/**
* @file IDerivationTool.h
* Header file for the IDerivationTool class
* @author Carl Gwilliam, 2021
*/
#ifndef DERIVATIONTOOLS_IDERIVATIONTOOL_H
#define DERIVATIONTOOLS_IDERIVATIONTOOL_H
// Gaudi
#include "GaudiKernel/IAlgTool.h"
#include "GaudiKernel/ToolHandle.h"
///Interface for derivation tools
class IDerivationTool : virtual public IAlgTool
{
public:
// InterfaceID
DeclareInterfaceID(IDerivationTool, 1, 0);
virtual ~IDerivationTool() = default;
// Apply skimming
virtual bool passed() = 0;
/// Apply thinning
virtual StatusCode removeBranch() = 0;
/// Apply augmentation
virtual StatusCode addBranch() = 0;
private:
// None
};
#endif //DERIVATIONTOOLS_IDERIVATIONTOOL_H
/*
Copyright (C) 2021 CERN for the benefit of the FASER collaboration
*/
/**
* @file ExamplederivationTool.cxx
* Implementation file for the ExamplederivationTool class
* @ author C. Gwilliam, 2021
**/
#include "ExampleDerivationTool.h"
// Constructor
ExampleDerivationTool::ExampleDerivationTool(const std::string& type, const std::string& name, const IInterface* parent) :
base_class(type, name, parent)
{
//std::cout << "CTOR " << name << " with SaveFraction = " << m_fraction << std::endl;
}
// Initialization
StatusCode
ExampleDerivationTool::initialize() {
ATH_MSG_INFO( name() << "::initalize()" );
//std::cout << "INIT " << name() << " with SaveFraction = " << m_fraction << std::endl;
return StatusCode::SUCCESS;
}
bool
ExampleDerivationTool::passed(){
bool accept(false);
m_events++;
float frac = ((float)(m_passed+1))/(float)m_events * 100.0;
if (frac > m_fraction) {
ATH_MSG_DEBUG("Filter failed " << m_passed << " " << m_events << " " << frac << " " << m_fraction);
accept = false;
} else {
ATH_MSG_DEBUG("Filter passed " << m_passed << " " << m_events << " " << frac << " " << m_fraction);
accept = true;
m_passed++;
}
return accept;
}
/*
Copyright (C) 2021 CERN for the benefit of the FASER collaboration
*/
/** @file TriggerStreamTool.h
* Header file for TriggerStreamTool.h
*
*/
#ifndef DERIVATIONTOOLS_EXAMPLEDERIVATIONTOOL_H
#define DERIVATIONTOOLS_EXAMPLEDERIVATIONTOOL_H
//Athena
#include "AthenaBaseComps/AthAlgTool.h"
#include "DerivationTools/IDerivationTool.h"
//Gaudi
#include "GaudiKernel/ToolHandle.h"
//STL
class ExampleDerivationTool: public extends<AthAlgTool, IDerivationTool> {
public:
/// Normal constructor for an AlgTool; 'properties' are also declared here
ExampleDerivationTool(const std::string& type,
const std::string& name, const IInterface* parent);
/// Retrieve the necessary services in initialize
StatusCode initialize();
// Apply skimming
bool passed();
/// Apply thinning
StatusCode removeBranch() {return StatusCode::SUCCESS;}
/// Apply augmentation
StatusCode addBranch() {return StatusCode::SUCCESS;}
private:
/** Fraction of events to save */
Gaudi::Property<float> m_fraction {this, "SaveFraction", 100, "Fraction of events to save"};
/** Number of events processed */
int m_events {0};
/** Number of events selected */
int m_passed {0};
};
#endif // WAVEDIGITOOLS_WAVEFORMDIGITISATIONTOOL_H
/*
Copyright (C) 2021 CERN for the benefit of the FASER collaboration
*/
/**
* @file TriggerStreamTool.cxx
* Implementation file for the TriggerStreamTool class
* @ author C. Gwilliam, 2021
**/
#include "TriggerStreamTool.h"
// Constructor
TriggerStreamTool::TriggerStreamTool(const std::string& type, const std::string& name, const IInterface* parent) :
base_class(type, name, parent)
{
}
// Initialization
StatusCode
TriggerStreamTool::initialize() {
ATH_MSG_INFO( name() << "::initalize() with mask = " << m_mask );
ATH_CHECK( m_triggerDataKey.initialize() );
return StatusCode::SUCCESS;
}
bool
TriggerStreamTool::passed() {
SG::ReadHandle<xAOD::FaserTriggerData> triggerData(m_triggerDataKey);
return triggerData->tap() & m_mask;
}
/*
Copyright (C) 2021 CERN for the benefit of the FASER collaboration
*/
/** @file TriggerStreamTool.h
* Header file for TriggerStreamTool.h
*
*/
#ifndef DERIVATIONTOOLS_TRIGGERSTREAMTOOL_H
#define DERIVATIONTOOLS_TRIGGERSTREAMTOOL_H
// FASER
#include "StoreGate/ReadHandleKey.h"
#include "xAODFaserTrigger/FaserTriggerData.h"
//Athena
#include "AthenaBaseComps/AthAlgTool.h"
#include "DerivationTools/IDerivationTool.h"
//Gaudi
#include "GaudiKernel/ToolHandle.h"
//STL
class TriggerStreamTool: public extends<AthAlgTool, IDerivationTool> {
public:
/// Normal constructor for an AlgTool; 'properties' are also declared here
TriggerStreamTool(const std::string& type,
const std::string& name, const IInterface* parent);
/// Retrieve the necessary services in initialize
StatusCode initialize();
// Apply skimming
bool passed();
/// Apply thinning
StatusCode removeBranch() {return StatusCode::SUCCESS;}
/// Apply augmentation
StatusCode addBranch() {return StatusCode::SUCCESS;}
private:
/** Trigger condition to apply */
Gaudi::Property<int> m_mask {this, "TriggerMask", 0x8, "Trigger mask to apply"};
/// StoreGate key
SG::ReadHandleKey<xAOD::FaserTriggerData> m_triggerDataKey
{ this, "FaserTriggerDataKey", "FaserTriggerData", "ReadHandleKey for xAOD::FaserTriggerData"};
};
#endif // WAVEDIGITOOLS_WAVEFORMDIGITISATIONTOOL_H
#include "../TriggerStreamTool.h"
#include "../ExampleDerivationTool.h"
DECLARE_COMPONENT( TriggerStreamTool )
DECLARE_COMPONENT( ExampleDerivationTool )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment