Commit 7c37ef2d authored by Mykhailo Dalchenko's avatar Mykhailo Dalchenko
Browse files

Fix remaining bugs and update logging treatment

parent 3b505f91
Pipeline #2389872 failed with stages
in 60 minutes and 3 seconds
......@@ -5,6 +5,7 @@
tool = "slurm" # Either local, sge or slurm are supported
[environment.dl1]
conda_environment = 'lst'
temporary_local_configuration_path = '/tmp/lstchain-config'
configuration_repository = 'https://gitlab.cern.ch/cta-unige/lstchain-config'
tag = "lst-standard-config"
......
......@@ -52,7 +52,7 @@ PRODUCTION_STAGES = ('corsika', 'sim_telarray', 'dl1', 'dl2')
PRODUCTION_TYPES = ('electron', 'gamma', 'gamma_diffuse', 'pedestal', 'proton', 'parameter_scan')
SUBMISSION_TOOLS = ('local', 'sge', 'slurm')
LOGGING_LEVELS = {0: logging.ERROR, 1: logging.INFO, 2: logging.DEBUG}
LOGGING_LEVELS = {0: logging.CRITICAL, 1: logging.ERROR, 2: logging.WARNING, 3: logging.INFO, 4: logging.DEBUG}
# ***********************************************************
......@@ -80,7 +80,8 @@ def setup_logging(verbosity=1):
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
#logging.getLogger('').addHandler(console)
LOGGER.addHandler(console)
def validate_config(config):
......@@ -309,12 +310,14 @@ def create_commands(config):
elif config['environment']['submission']['tool'] == 'slurm':
n_files_per_job = config['dl1']['n_files_per_job']
input_dir = config['environment'][stage]['input_path']
log_level = logging.getLevelName(max([h.level for h in LOGGER.handlers]))
job_commands.append(f"convert_r0_to_dl1 "
f"-i {input_dir} "
f"-o {output_path} "
f"-c {lstchain_config_file} "
f"-n {n_files_per_job} "
f"-a $SLURM_ARRAY_TASK_ID")
f"-a $SLURM_ARRAY_TASK_ID "
f"-l {log_level}")
elif config['environment']['submission']['tool'] == 'sge':
pass # TODO Implement sge batch submission
......@@ -402,6 +405,8 @@ def submit_jobs(config, job_commands):
if stage == 'dl1':
log_file_path = run_directory + '/log/' + job_name[:-4] + '-%A-%a' + '.log'
error_file_path = run_directory + '/error/' + job_name[:-4] + '-%A-%a' + '.err'
conda_environment = config['environment']['dl1']['conda_environment']
template = template.replace('__condaenv__', conda_environment)
else:
log_file_path = run_directory + '/log/' + job_name[:-3] + 'log'
error_file_path = run_directory + '/error/' + job_name[:-3] + 'err'
......@@ -434,10 +439,10 @@ def submit_jobs(config, job_commands):
n_files_to_analyze = len([name for name in os.listdir(input_dir)
if os.path.isfile(os.path.join(input_dir, name))])
n_files_per_job = config['dl1']['n_files_per_job']
array_n = n_files_to_analyze // n_files_per_job
array_n = n_files_to_analyze // n_files_per_job - 1
LOGGER.info("Submitting job:\n%s",
f"sbatch --array=0-{array_n} {job_file_path}")
job_id = subprocess.check_output(['sbatch', job_file_path],
job_id = subprocess.check_output(['sbatch', f'--array=0-{array_n}', job_file_path],
stderr=subprocess.STDOUT)
else:
......
......@@ -3,10 +3,15 @@
Run R0 to DL1 conversion job with N input files
"""
import argparse
import logging
import os
import sys
from pathlib import Path
from lstchain.reco import r0_to_dl1
from lstchain.paths import r0_to_dl1_filename
from lstchain.io.config import read_configuration_file
def get_parser():
......@@ -18,18 +23,31 @@ def get_parser():
"""
parser = argparse.ArgumentParser()
required_named = parser.add_argument_group('required named arguments')
required_named.add_argument('-i', '--input-dir', help='R0 input directory path',
required_named.add_argument('-i', '--input-dir', help='R0 input directory path', type=Path,
required=True)
required_named.add_argument('-o', '--output-dir', help='DL1 output directory path',
required_named.add_argument('-o', '--output-dir', help='DL1 output directory path', type=Path,
required=True)
required_named.add_argument('-c', '--config', help='LSTchain JSON configuration file name',
type=Path,
required=True)
required_named.add_argument('-a', '--array-job-id', help='Batch array job ID',
required_named.add_argument('-a', '--array-job-id', help='Batch array job ID', type=int,
required=True)
parser.add_argument('-n', '--files-per-job', help='Number of files per job', type=int, default=1)
parser.add_argument('-l', '--log-level', help='Log level', type=str, default='INFO',
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'])
return parser
class LessThanFilter(logging.Filter):
def __init__(self, exclusive_maximum, name=""):
super(LessThanFilter, self).__init__(name)
self.max_level = exclusive_maximum
def filter(self, record):
#non-zero return means we log this message
return 1 if record.levelno < self.max_level else 0
def main():
"""
Parse command line input arguments and launch the sim_runner.
......@@ -37,6 +55,26 @@ def main():
parser = get_parser()
args = parser.parse_args()
#Get the root logger
root_logger = logging.getLogger()
#Have to set the root logger level, it defaults to logging.WARNING
root_logger.setLevel(logging.NOTSET)
#Set log message format
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
logging_handler_out = logging.StreamHandler(sys.stdout)
logging_handler_out.setLevel(level=args.log_level)
logging_handler_out.setFormatter(formatter)
logging_handler_out.addFilter(LessThanFilter(logging.WARNING))
root_logger.addHandler(logging_handler_out)
logging_handler_err = logging.StreamHandler(sys.stderr)
logging_handler_err.setLevel(logging.WARNING)
logging_handler_err.setFormatter(formatter)
root_logger.addHandler(logging_handler_err)
logger = logging.getLogger(__name__)
input_files = [f'{args.input_dir}/{filename}' for filename in os.listdir(args.input_dir)]
input_files_sliced = input_files[args.array_job_id * args.files_per_job:
(args.array_job_id + 1) * args.files_per_job]
......@@ -44,11 +82,19 @@ def main():
output_dir = args.output_dir.absolute()
output_dir.mkdir(exist_ok=True)
try:
config = read_configuration_file(args.config.absolute())
except Exception as e:
logger.error('Config file {args.config} could not be read:\n%s', e)
sys.exit(1)
for i_file in input_files_sliced:
output_file = output_dir / r0_to_dl1_filename(i_file)
r0_to_dl1_fname = r0_to_dl1_filename(i_file.rsplit('/')[-1])
output_file = output_dir / r0_to_dl1_fname
logger.info('\nProcessing input file: %s\nOutput file: %s', i_file, output_file)
r0_to_dl1.r0_to_dl1(i_file,
output_filename=output_file,
custom_config=args.config,
custom_config=config,
)
......
......@@ -18,7 +18,7 @@ def get_parser():
required_named.add_argument('-c', '--config', help='Simulation configuration file name',
required=True)
parser.add_argument('-v', '--verbosity', help='Logging verbosity level', type=int,
choices=[0, 1, 2], default=1)
choices=[0, 1, 2, 3, 4], default=1)
return parser
......
......@@ -7,4 +7,9 @@
#SBATCH --output=__stdoutput__
#SBATCH --error=__stderror__
echo "$(conda env list)"
echo "conda activate __condaenv__"
conda activate __condaenv__
echo "Launching"
echo "__command__"
__command__
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment