Forked from
faser / calypso
396 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
faser_simulate.py 8.92 KiB
#!/usr/bin/env python
"""
Produce simulated hits from input 4-vectors
Derived from G4FaserAlgConfigNew
Usage:
faser_simulate.py filepath outfile
filepath - full path, including url if needed, to the input 4-vector file
outfile - output filename, parameters will be inferred from this name
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS and FASER collaborations
"""
if __name__ == '__main__':
import sys
import time
a = time.time()
#
# Parse command-line options
#
import argparse
parser = argparse.ArgumentParser(description="Run FASER simulation")
parser.add_argument("file_path",
help="Fully qualified path of the raw input file")
parser.add_argument("output",
help="Output file name")
#parser.add_argument("--run", type=int, default=123456,
# help="Specify run number to use in simulated data")
parser.add_argument("--geom", default="TI12MC",
help="Specify geomtery to simulation (default: TI12MC, alt: TestBeamMC)")
parser.add_argument("--xangle", type=float, default=0.0,
help="Specify H crossing angle (in Radians)")
parser.add_argument("--yangle", type=float, default=0.0,
help="Specify V crossing angle (in Radians)")
parser.add_argument("--xshift", type=float, default=0.0,
help="Specify H shift of events wrt FASER (in mm)")
parser.add_argument("--yshift", type=float, default=0.0,
help="Specify V shift of events wrt FASER (in mm)")
parser.add_argument("-t", "--tag", default="",
help="Specify sim tag (to append to output filename)")
parser.add_argument("-s", "--skip", type=int, default=0,
help="Specify number of events to skip (default: none)")
parser.add_argument("-n", "--nevents", type=int, default=-1,
help="Specify number of events to process (default: all)")
parser.add_argument("-v", "--verbose", action='store_true',
help="Turn on DEBUG output")
args = parser.parse_args()
from pathlib import Path
filepath = Path(args.file_path)
#
# Parse input file
#
print(f"Starting digitization of {filepath.name}")
filestem = filepath.stem
if len(args.tag) > 0:
if args.tag in filestem:
print(f"Not adding tag {args.tag} to {filestem}")
else:
filestem += f"-{args.tag}"
if args.output:
# Just directly specify the output filename
outfile = args.output
spl = outfile.split('-')
if len(spl) < 4:
print(f"Can't get run number from {outfile}!")
sys.exit(1)
runnum = int(spl[2])
segnum = int(spl[3])
else:
outfile = f"{filestem}-HITS.root"
print(f"Output file name not specified")
sys.exit(1)
print(f"Outfile: {outfile}")
#
# Figure out events to run
#
if args.skip > 0:
print(f"Skipping {args.skip} events by command-line option")
if args.nevents > 0:
print(f"Reconstructing {args.nevents} events by command-line option")
#
# Set up logging and config behaviour
#
from AthenaCommon.Logging import log
from AthenaCommon.Constants import DEBUG, VERBOSE
from AthenaCommon.Configurable import Configurable
log.setLevel(DEBUG)
Configurable.configurableRun3Behavior = 1
#
# Import and set config flags
#
from CalypsoConfiguration.AllConfigFlags import ConfigFlags
from AthenaConfiguration.Enums import ProductionStep
ConfigFlags.Common.ProductionStep = ProductionStep.Simulation
#
# All these must be specified to avoid auto-configuration
#
ConfigFlags.Input.RunNumber = [ runnum ]
ConfigFlags.Input.OverrideRunNumber = True
ConfigFlags.Input.LumiBlockNumber = [ (segnum+1) ]
ConfigFlags.Input.isMC = True
#
# Input file name
#
# Path mangles // in url, so use direct file_path here
ConfigFlags.Input.Files = [ args.file_path ]
#
# Skip events
#
ConfigFlags.Exec.SkipEvents = args.skip
ConfigFlags.Exec.MaxEvents = args.nevents
#
# Output file name
#
ConfigFlags.Output.HITSFileName = outfile
#
# Sim ConfigFlags
#
ConfigFlags.Sim.Layout = "FASER"
ConfigFlags.Sim.PhysicsList = "FTFP_BERT"
ConfigFlags.Sim.ReleaseGeoModel = False
ConfigFlags.Sim.IncludeParentsInG4Event = True # Controls whether BeamTruthEvent is written to output HITS file
#
# Figure out configuration
#
if args.geom == "TI12MC":
# 2022 TI12 geometry
ConfigFlags.GeoModel.FaserVersion = "FASERNU-03" # Geometry set-up
ConfigFlags.IOVDb.GlobalTag = "OFLCOND-FASER-03" # Conditions set-up
# TI12 detectors
detectors = ['Veto', 'VetoNu', 'Preshower', 'FaserSCT', 'Ecal',
'Trigger', 'Dipole', 'Emulsion', 'Trench']
elif args.geom == "TestBeamMC":
# Define 2021 test beam geometry
ConfigFlags.GeoModel.FaserVersion = "FASER-TB00" # Geometry set-up
ConfigFlags.IOVDb.GlobalTag = "OFLCOND-FASER-TB00" # Conditions set-up
# Testbeam detectors (trigger layers are actually veto counters)
detectors = ['Veto', 'Preshower', 'FaserSCT', 'Ecal']
else:
print(f"Unknown geometry {args.geom}!")
sys.exit(1)
#
# Units are radians and mm
# Probably should only be allowed in TI12, but leave it here for now
ConfigFlags.addFlag("Sim.Beam.xangle", args.xangle) # Potential beam crossing angles
ConfigFlags.addFlag("Sim.Beam.yangle", args.yangle)
ConfigFlags.addFlag("Sim.Beam.xshift", args.xshift) # Potential beam shift
ConfigFlags.addFlag("Sim.Beam.yshift", args.yshift)
ConfigFlags.addFlag("Input.InitialTimeStamp", 0) # To avoid autoconfig
ConfigFlags.GeoModel.Align.Dynamic = False
# import sys
# ConfigFlags.fillFromArgs(sys.argv[1:])
doShiftLOS = (ConfigFlags.Sim.Beam.xangle or ConfigFlags.Sim.Beam.yangle or
ConfigFlags.Sim.Beam.xshift or ConfigFlags.Sim.Beam.yshift)
#
# Setup detector flags
#
from CalypsoConfiguration.DetectorConfigFlags import setupDetectorsFromList
setupDetectorsFromList(ConfigFlags, detectors, toggle_geometry=True)
#
# Finalize flags
#
ConfigFlags.lock()
#
# Initialize a new component accumulator
#
from CalypsoConfiguration.MainServicesConfig import MainServicesCfg
cfg = MainServicesCfg(ConfigFlags)
#
# Check whether a known input file was specified
#
if ConfigFlags.Input.Files[0].endswith(".events") or ConfigFlags.Input.Files[0].endswith(".hepmc"):
from HEPMCReader.HepMCReaderConfig import HepMCReaderCfg
if doShiftLOS:
cfg.merge(HepMCReaderCfg(ConfigFlags, McEventKey = "BeamTruthEvent_ATLASCoord"))
else:
cfg.merge(HepMCReaderCfg(ConfigFlags))
from McEventSelector.McEventSelectorConfig import McEventSelectorCfg
cfg.merge(McEventSelectorCfg(ConfigFlags))
#
# Else, set up to read it as a pool.root file
#
else:
from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
cfg.merge(PoolReadCfg(ConfigFlags))
if doShiftLOS:
from SGComps.AddressRemappingConfig import InputOverwriteCfg
# Rename old truth collection to add ATLAS coord to can still use BeamTruthEvent for the one in FASER Coords
cfg.merge(InputOverwriteCfg("McEventCollection", "BeamTruthEvent", "McEventCollection", "BeamTruthEvent_ATLASCoord"))
#
# Output file
#
from AthenaPoolCnvSvc.PoolWriteConfig import PoolWriteCfg
cfg.merge(PoolWriteCfg(ConfigFlags))
#
# Shift LOS
#
if doShiftLOS:
import McParticleEvent.Pythonizations
from GeneratorUtils.ShiftLOSConfig import ShiftLOSCfg
cfg.merge(ShiftLOSCfg(ConfigFlags,
xcross = ConfigFlags.Sim.Beam.xangle,
ycross = ConfigFlags.Sim.Beam.yangle,
xshift = ConfigFlags.Sim.Beam.xshift,
yshift = ConfigFlags.Sim.Beam.yshift))
#
# Add the G4FaserAlg
#
from G4FaserAlg.G4FaserAlgConfigNew import G4FaserAlgCfg
cfg.merge(G4FaserAlgCfg(ConfigFlags))
#
# Dump config
#
from AthenaConfiguration.ComponentFactory import CompFactory
cfg.addEventAlgo(CompFactory.JobOptsDumperAlg(FileName="G4FaserTestConfig.txt"))
cfg.getService("StoreGateSvc").Dump = True
cfg.getService("ConditionStore").Dump = True
cfg.printConfig(withDetails=True, summariseProps = False) # gags on ParticleGun if summariseProps = True?
ConfigFlags.dump()
#f = open("test.pkl","wb")
#cfg.store(f)
#f.close()
#
# Execute and finish
#
# This fails with ShiftLOSCfg...
#if args.verbose:
# cfg.foreach_component("*").OutputLevel = "DEBUG"
#else:
# cfg.foreach_component("*").OutputLevel = "INFO"
sc = cfg.run(maxEvents=args.nevents)
b = time.time()
log.info("Finish execution in " + str(b-a) + " seconds")
#
# Success should be 0
#
sys.exit(int(sc.isFailure()))