Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lhcb/MooreOnline
1 result
Show changes
Commits on Source (11)
Showing
with 225 additions and 72 deletions
......@@ -10,6 +10,7 @@
# or submit itself to any jurisdiction. #
###############################################################################
import os
import sys
import socket
from itertools import chain
from pathlib import Path
......@@ -211,21 +212,33 @@ if run_online:
# which is set by the SMI controller to the total number of Allen instances in the architecture,
# *minus one*.
n_instances = int(os.getenv("NBOFSLAVES")) + 1
# The UTGID is set to {PARTITION}_{HOSTNAME}_Allen_{INSTANCE}, where {INSTANCE} is the instance
# number, starting from zero.
instance = int(os.getenv("UTGID").split("_")[3])
# Unsupported configuration
if n_instances > 3:
print(
f"ERROR: Unsupported HLT1 DAQ configuration with {n_instances} instances"
)
sys.exit(1)
# The MBM options have a variable named Allen_Input{N} for each input buffer.
input_buffers = [
getattr(mbm_setup, a) for a in sorted(dir(mbm_setup))
if a.startswith('Allen_Input')
]
# Only one Allen instance --> connect it to all BU buffers
# On sodin01 there is always just one BU (and one Allen) which only writes to Events_0,
# so make sure Allen does not subscribe to Events_1 and does not hang waiting for events
if n_instances == 1 and socket.gethostname() != 'sodin01':
# Special case for sodin01, where there is a single BU and all Allen instances must read only
# from that BU's output buffer
if socket.gethostname() == 'sodin01':
mep_provider.Connections = [input_buffers[0]]
# Two cases to connect to all BU output buffers:
# - There is only one Allen instance
# - This is the third Allen instance
elif (n_instances == 1 or instance == 2):
mep_provider.Connections = input_buffers
# Multiple Allen instances --> connect this instance to only one BU buffer
# There are multiple Allen instances and this the first or second instance
else:
# The UTGID is set to {PARTITION}_{HOSTNAME}_Allen_{INSTANCE}, where {INSTANCE} is the
# instance number, starting from zero.
instance = int(os.getenv("UTGID").split("_")[3])
mep_provider.Connections = [input_buffers[instance]]
mep_provider.Requests = [
'EvType=1;TriggerMask=0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF;VetoMask=0,0,0,0;MaskType=ANY;UserType=ONE;Frequency=PERC;Perc=100.0'
......
......@@ -23,7 +23,7 @@ Run lumi decoding and encoding on Allen MDF output created from MEP input
<argument name="unsupported_platforms"><set>
<text>detdesc</text>
</set></argument>
<argument name="reference"><text>${ALLENONLINEROOT}/tests/refs/test_lumi.ref</text></argument>
<argument name="reference"><text>../refs/test_lumi.ref</text></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
......
......@@ -24,7 +24,7 @@
<argument name="prerequisites"><set>
<tuple><text>mep_passthrough_no_ut</text><enumeral>PASS</enumeral></tuple>
</set></argument>
<argument name="reference"><text>${ALLENONLINEROOT}/tests/refs/test_mep_banks.ref</text></argument>
<argument name="reference"><text>../refs/test_mep_banks.ref</text></argument>
<argument name="timeout"><integer>600</integer></argument>
<argument name="validator"><text>
......
......@@ -25,7 +25,7 @@
<argument name="prerequisites"><set>
<tuple><text>mep_passthrough_no_ut</text><enumeral>PASS</enumeral></tuple>
</set></argument>
<argument name="reference"><text>${ALLENONLINEROOT}/tests/refs/test_mep_banks_transpose-SYMLINK-DO_NOT_UPDATE_WITH_NEW.ref</text></argument>
<argument name="reference"><text>../refs/test_mep_banks_transpose-SYMLINK-DO_NOT_UPDATE_WITH_NEW.ref</text></argument>
<argument name="timeout"><integer>600</integer></argument>
<argument name="validator"><text>
......
###############################################################################
# (c) Copyright 2021 CERN for the benefit of the LHCb Collaboration #
# (c) Copyright 2000-2024 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
......@@ -7,46 +7,94 @@
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
#############################################################################
###############################################################################
from PyConf.application import (
configure_input,
configure,
default_raw_banks,
make_odin,
)
from PyConf.Algorithms import (
FTRawBankDecoder,
FTLiteClusterMonitor,
FTLiteClusterTAEMonitor,
)
from MooreOnlineConf.utils import (
common_monitors_node,
passes_rb,
RoutingBit,
decode_tae,
if_then,
run_all,
)
from Moore import options
import os
from PyConf.application import default_raw_event, default_raw_banks, make_odin
from Moore import options, run_reconstruction
from PyConf.Algorithms import FTDigitMonitor, FTNZSRawBankDecoder
from PyConf.Algorithms import FTClusterMonitor, FTRawBankDecoder
from Moore.config import Reconstruction
from RecoConf.standalone import reco_prefilters
try:
import OnlineEnvBase as OnlineEnv
TAE_HALF_WINDOW = OnlineEnv.TAE
except ImportError:
TAE_HALF_WINDOW = 3
options.dddb_tag = 'upgrade/master'
options.conddb_tag = 'upgrade/master'
partition = os.environ.get("PARTITION", "LHCb")
isLocalFlag = bool(partition == "LHCb")
def ft_mon():
raw_event = default_raw_event(["FTNZS"])
raw_banks = default_raw_banks("FTGeneric")
def main():
odin = make_odin()
algs = []
if options.input_type.lower() == 'online':
from MooreOnlineConf.utils import update_and_reset
algs.append(update_and_reset())
def make_ft_clusters(name=""):
raw_banks = default_raw_banks("FTCluster")
ZS_decoder = FTRawBankDecoder(
name=f"FTRawBankDecoder{name}", Odin=odin, RawBanks=raw_banks)
return ZS_decoder.OutputLocation
# the standard monitor
zs_monitor_lumi = FTLiteClusterMonitor(
name="FTLiteClusterMonitorLumi",
allow_duplicate_instances_with_distinct_names=True,
InputODIN=odin,
ClusterLocation=make_ft_clusters())
zs_monitor_physics = FTLiteClusterMonitor(
name="FTLiteClusterMonitor",
allow_duplicate_instances_with_distinct_names=True,
InputODIN=odin,
ClusterLocation=make_ft_clusters())
ZS_decoder = FTRawBankDecoder(RawBanks=raw_banks)
ZS_monitor = FTClusterMonitor(
InputODIN=odin, ClusterLocation=ZS_decoder.OutputLocation)
algs += [
ZS_decoder,
ZS_monitor,
]
# the TAE monitor
is_tae, tae_decoding, tae_odins, tae_data = decode_tae(
make_ft_clusters, TAE_HALF_WINDOW)
tae_monitor = FTLiteClusterTAEMonitor(
name="FTLiteClusterTAEMonitor",
ODINVector=list(tae_odins.values()),
InputVector=list(tae_data.values()),
SuperTAEHalfWindow=TAE_HALF_WINDOW)
NZS_decoder = FTNZSRawBankDecoder(RawEventLocations=raw_event)
NZS_monitor = FTDigitMonitor(
InputODIN=odin, DigitLocation=NZS_decoder.OutputLocation)
algs += [
NZS_decoder,
NZS_monitor,
]
# assemble the control flow
if isLocalFlag:
top_node = run_all(
"top",
[
common_monitors_node(), # common monitoring to all tasks
if_then("IfPHYSICS", passes_rb(RoutingBit.PHYSICS),
zs_monitor_physics),
if_then("IfLUMI", passes_rb(RoutingBit.LUMI), zs_monitor_lumi),
if_then("IfTAE", is_tae,
run_all("TAE", [tae_decoding, tae_monitor])),
])
else:
top_node = run_all(
"top",
[
common_monitors_node(), # common monitoring to all tasks
zs_monitor_physics,
if_then("IfTAE", is_tae,
run_all("TAE", [tae_decoding, tae_monitor])),
])
return Reconstruction('ft_mon', algs, reco_prefilters(gec=False))
return top_node
run_reconstruction(options, ft_mon)
configure_input(options)
configure(options, main())
......@@ -15,7 +15,10 @@ source "$DIR/setupTask.sh"
setup_options_path
MBM_SETUP_OPTIONS=${MOORESCRIPTSROOT}/tests/options/HLT1/MBM_setup.opts
unset CUDA_VISIBLE_DEVICES
if test -n "${BIND_NUMA-}"; then
export BIND_NUMA_GPU=1
fi
application=AllenApplication
exec_gaudirun ${ALLENONLINEROOT}/options/AllenConfig.py
......@@ -153,7 +153,12 @@ exec_gaudirun() {
fi
dump_environment
cmd=()
if test -n "${BIND_NUMA-}"; then
if test -n "${BIND_NUMA_GPU-}"; then
gpu_num=$(echo $UTGID | grep -oP '[0-9]+$' )
numa_domain=$(nvidia-smi topo -i ${gpu_num} -C | grep -oP "[0-9]+$")
cmd+=(numactl -N $numa_domain -m $numa_domain)
export CUDA_VISIBLE_DEVICES=${gpu_num}
elif test -n "${BIND_NUMA-}"; then
numa_domains_num=$(lscpu -p=NODE | grep -oP '^[0-9]+$' | sort | uniq | wc -l)
numa_domain=$(( $(echo $UTGID | grep -oP '[0-9]+$') % $numa_domains_num ))
cmd+=(numactl -N $numa_domain -m $numa_domain)
......
......@@ -14,7 +14,7 @@ Manager.Runable = "Wrap";
Wrap.Callable = "Reader";
Task.HavePause = true;
Reader.Buffer = "Events";
Reader.Buffer = "$EVENTS_INSTANCE_BUFFER";
Reader.BrokenHosts = "";
Reader.Directories = @OnlineEnv.Reader_Directories;
Reader.FilePrefix = @OnlineEnv.Reader_FilePrefix;
......@@ -34,4 +34,4 @@ Reader.ReuseFile = @OnlineEnv.Reader_Preload;
MEPManager.PartitionBuffers = true;
MEPManager.PartitionName = @OnlineEnv.PartitionName;
MEPManager.PartitionID = @OnlineEnv.PartitionID;
MEPManager.Buffers = {"Events"};
MEPManager.Buffers = {"$EVENTS_INSTANCE_BUFFER"};
......@@ -9,6 +9,7 @@
# or submit itself to any jurisdiction. #
###############################################################################
import os
import re
import platform
import xml.etree.ElementTree as ET
from string import Template
......@@ -39,14 +40,28 @@ def _rinterp(obj, mapping):
return obj
def parse_task_tree(task):
def parse_task_tree(task, instances):
"""Parse task tree into name, n_instances, subprocess.Popen arguments."""
name = task.attrib["name"]
try:
n_instances = int(task.attrib.get("instances", 1))
except ValueError: # instances = "NUMBER_OF_INSTANCES"
n_instances = 1
instance_match = re.match(R"(\d+|(NUMBER_OF_INSTANCES)(?::(\d+))?)",
task.attrib.get("instances", "1"))
if instance_match.group(2) is None:
# Default number of instances or specified number
n_instances = int(instance_match.group(1))
elif instance_match.groups()[1:] == ('NUMBER_OF_INSTANCES', None):
# NUMBER_OF_INSTANCES
n_instances = 1 if instances is None else instances
elif instances is not None and instances < int(instance_match.group(3)):
# NUMBER_OF_INSTANCES:N with instances != None
raise ValueError(
f"This architecture requires at least {int(instance_match.groups(3))} instances to run"
)
else:
# NUMBER_OF_INSTANCES:N
n_instances = int(
instance_match.group(3)) if instances is None else instances
params = {"args": [], "env": {}, "cwd": "/"}
# The controller passes populates `-instances`` as the instances argument
......@@ -73,11 +88,11 @@ def parse_task_tree(task):
return name, n_instances, params
def read_xml(path):
def read_xml(path, main_instances):
""""Parse architecture file into a list of task specs."""
tree = ET.parse(path)
tasks_inventory = tree.getroot()
return [parse_task_tree(task) for task in tasks_inventory]
return [parse_task_tree(task, main_instances) for task in tasks_inventory]
def instance_args(tasks, replacements):
......
......@@ -49,9 +49,10 @@ async def run(tasks: List[emulator.Task], args, extra_argv):
t for t in tasks if re.match(r".*(HLT|Mon).*",
t.utgid.split("_")[2])
]
if len(prod_tasks) != 1:
raise ValueError("There must be exactly one *Prod task")
prod_task = prod_tasks[0]
if args.measure_throughput == 0:
if len(prod_tasks) != 1:
raise ValueError("There must be exactly one *Prod task")
prod_task = prod_tasks[0]
if extra_args.use_perf or extra_args.use_perf_control:
for t in main_tasks:
......@@ -95,25 +96,25 @@ async def run(tasks: List[emulator.Task], args, extra_argv):
else:
# wait for the reader task to get to a PAUSED state (no more input)
await tasks_wait_for_status(prod_tasks, "PAUSED")
dim_prod_out = asyncdim.DimService(prod_task.utgid + "/Events/OUT", "X")
dim_prod_out = asyncdim.DimService(prod_task.utgid + "/Events/OUT",
"X")
# stop producing new data
await tasks_send_command(prod_tasks, "stop")
await tasks_wait_for_status(prod_tasks, "READY")
# Get last published value
# TODO can we make it such that the number of events put in the buffer keeps being published after stop?
# Markus says MDFReader may need to be Class 1 but are there side effects?
n_events_produced = next(
v for ts, v in reversed(await dim_prod_out.get_all()) if v is not None)
if not args.measure_throughput > 0:
if "HLT1" in main_tasks[0].utgid:
log.info(f"Waiting until all events have been processed")
await hlt1_wait_for_output(main_tasks, "Events/" + prod_counter,
"MBMOutput/NProcessed")
elif "HLT2" in main_tasks[0].utgid:
# Get last published value
# TODO can we make it such that the number of events put in the buffer keeps being published after stop?
# Markus says MDFReader may need to be Class 1 but are there side effects?
n_events_produced = next(
v for ts, v in reversed(await dim_prod_out.get_all())
if v is not None)
log.info(
f"Waiting until all {n_events_produced} events have been processed"
)
......
......@@ -28,10 +28,33 @@ setup_options_path
cd_working_dir
dump_environment
# For HLT1 tests there is a separate Events buffer per NUMA
# domain. Setting this here allows a single .opts file to be used for both
if [[ "$NBOFSLAVES" == "0" ]]; then
export EVENTS_INSTANCE_BUFFER="Events"
else
instance=$(echo $UTGID | grep -oP '[0-9]+$' )
export EVENTS_INSTANCE_BUFFER="Events_${instance}"
fi
cmd=()
if test -n "${BIND_NUMA-}"; then
numa_domains_num=$(lscpu -p=NODE | grep -oP '^[0-9]+$' | sort | uniq | wc -l)
numa_domain=$(( $(echo $UTGID | grep -oP '[0-9]+$') % $numa_domains_num ))
cmd+=(numactl -N $numa_domain -m $numa_domain)
fi
cmd+=(
setarch x86_64 --addr-no-randomize bash -c 'exec -a "$0" "$@"'
${UTGID}
genRunner.exe libDataflow.so dataflow_run_task
-msg=Dataflow_OutputLogger -mon=Dataflow_DIMMonitoring -class=${CLASS}
-opts=${OPTIONS}
)
if test -f "${OPTIONS}" -a -n "`echo ${OPTIONS} | grep .opts`"; then
exec -a ${UTGID} genRunner.exe libDataflow.so dataflow_run_task \
-msg=Dataflow_OutputLogger -mon=Dataflow_DIMMonitoring -class=${CLASS} \
-opts=${OPTIONS}
echo "${cmd[@]}"
exec "${cmd[@]}"
else
echo "'${OPTIONS}' does not exist does not end with .opts"
exit 123
......
......@@ -135,6 +135,13 @@ parser.add_argument(
help=
"Enables writing of the encoding keys by setting env WRITE_ENCODING_KEYS=1.",
)
parser.add_argument(
"-n",
"--instances",
type=int,
default=None,
help="Number of instances of the main task to run",
)
args, unknown_argv = parser.parse_known_args()
args.data_dir = args.working_dir / args.data_dir
......@@ -149,7 +156,13 @@ args.working_dir = args.working_dir.resolve()
# Ideally, the nightlies would properly isolate test jobs.
# Another approach would be to convince Online to be able to control
# where the shared resources go (/tmp/... and /dev/shm/...).
args.partition = args.partition + os.getenv("BUILD_ID", "")
PARTITION_MAX_LEN = 16
build_id = os.getenv("BUILD_ID", "")
args.partition = args.partition + build_id[-(
PARTITION_MAX_LEN - len(args.partition)):]
if len(args.partition) > PARTITION_MAX_LEN:
raise RuntimeError(f"Partition name is too long (len({args.partition!r})="
+ f"{len(args.partition)} > {PARTITION_MAX_LEN})")
emulator.setup_logging(
args.working_dir / args.log_file, console_level=args.log_level)
......@@ -168,7 +181,7 @@ replacements = {
"WORKING_DIR": args.working_dir.resolve(),
}
arch = architecture.read_xml(args.architecture)
arch = architecture.read_xml(args.architecture, args.instances)
if args.write_encoding_keys:
arch = architecture.overwrite_dict_value(
arch,
......
......@@ -24,4 +24,7 @@
</set></argument>
<argument name="timeout"><integer>200</integer></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="unsupported_platforms"><set>
<text>detdesc</text>
</set></argument>
</extension>
......@@ -12,6 +12,9 @@
#######################################################
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="prerequisites"><set>
<tuple><text>create_hlt1_tck_prepare</text><enumeral>PASS</enumeral></tuple>
</set></argument>
<argument name="program"><text>python</text></argument>
<argument name="args"><set>
<text>$ALLENROOT/scripts/create_hlt1_tck.py</text>
......
<?xml version="1.0" ?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2024 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="program"><text>rm</text></argument>
<argument name="args"><set>
<text>-rf</text>
<text>config.git</text>
</set></argument>
<argument name="timeout"><integer>100</integer></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
</extension>
......@@ -17,7 +17,7 @@ Run an HLT1 job in the Online testbench
<argument name="args"><set>
<text>$MOORESCRIPTSROOT/tests/options/HLT1Slim/Arch.xml</text>
<text>--working-dir=hlt1runchange</text>
<text>--partition=TESTHLT1RUNCHANGE</text>
<text>--partition=TESTHLT1RC</text>
<text>--test-file-db-key=2024_mep_292860_run_change_test</text>
<text>--hlt-type=config.git:0x10000001</text>
<text>--tck-from-odin</text>
......@@ -38,4 +38,7 @@ for fn in glob.glob(workdir + "/hlt1runchange/*.*"):
result[os.path.basename(fn)] = open(fn).read()
</text></argument>
<argument name="unsupported_platforms"><set>
<text>detdesc</text>
</set></argument>
</extension>
......@@ -22,6 +22,7 @@ Run an HLT2 job in the Online testbench
<text>--test-file-db-key=2022_raw_hlt1_253597</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="unsupported_platforms"><set><text>detdesc</text></set></argument>
<argument name="validator"><text>
# No validator for now: only check the exit code
......
......@@ -21,6 +21,7 @@ Run an ODINMon job in the Online testbench
<text>--test-file-db-key=2024_raw_hlt1_288877_tae</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="unsupported_platforms"><set><text>detdesc</text></set></argument>
<argument name="validator"><text>
# No validator for now: only check the exit code
......
......@@ -19,6 +19,7 @@ Run an ODINMon job in an offline way
<text>$MOOREONLINECONFROOT/options/odin.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>per-test</enumeral></argument>
<argument name="unsupported_platforms"><set><text>detdesc</text></set></argument>
<argument name="validator"><text>
# No validator for now: only check the exit code
......