diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py index 49bf20f7c189e4a1b69b319d894aae2e2a5e1a83..d4093ad289c45d419c2313780a1db4a89d7cb99a 100644 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py @@ -4,6 +4,7 @@ import os import subprocess import logging import argparse +import re sys.path.append('/sw/castor/Script') import cs.Tools.Libraries.Database as Oracle @@ -13,10 +14,11 @@ import cs.Tools.FilenameParsers.SFOFileNameParser as SFOFileNameParser # Config for cs...Database: only needs a few paramaters of the CS config class MockConfig: def __init__(self, url): - self.connection = url - self.file_table = 'SFO_TZ_FILE' - self.lb_table = 'SFO_TZ_LUMIBLOCK' - self.run_table = 'SFO_TZ_RUN' + self.DBURL = url + self.DBFileTable = 'SFO_TZ_FILE' + self.DBLBTable = 'SFO_TZ_LUMIBLOCK' + self.DBRunTable = 'SFO_TZ_RUN' + def oracle_check_closed_ondisk(oradb_url, run_number, outliers_file, verbose=False, dryrun=False, report=None): @@ -37,31 +39,54 @@ def oracle_check_closed_ondisk(oradb_url, run_number, outliers_file, with open(outliers_file, 'w') as outf: for [sfoid, sfopfn] in ora_state_3: - cmd = ['ssh', '-x', f'pc-tdq-{sfoid.lower()}', 'ls', f'{sfopfn}.COPIED'] - lsp = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + m = re.match('sfo-([0-9]+)', sfoid, re.IGNORECASE) + if not m: + outf.write(f'sfoid does not match expected format: {sfoid}, file: sfopfn') + continue + sfonum = int(m.group(1)) + if sfonum < 10: + sfoid = f'sfo-0{sfonum}' + else: + sfoid = f'sfo-{sfonum}' + + cmd = ['ssh', '-x', f'pc-tdq-{sfoid}', 'sh', '-c', f'"ls {sfopfn}*"'] + lsp = subprocess.run(cmd, stdout=subprocess.PIPE) + file_exist = False if lsp.returncode == 0: - # .COPIED file exists: mark file as transferred in DB + for file in lsp.stdout.decode().split('\n'): + ext = os.path.splitext(file)[1] + if ext == '.COPIED': + # .COPIED file exists: mark file as transferred in DB - # Get remote dir from .COPIED file - cmd = ['ssh', '-x', f'pc-tdq-{sfoid.lower()}', 'grep', 'RemoteDir', - f'{sfopfn}.COPIED'] - try: - output = subprocess.check_output(cmd) - remote_dir = output.decode().split()[2] - except subprocess.CalledProcessError: - outf.write(f'(closed,ondisk), error reading RemoteDir from' - ' COPIED file: {sfoid.lower()} {sfopfn}\n') - continue + # Get remote dir from .COPIED file + cmd = ['ssh', '-x', f'pc-tdq-{sfoid}', 'grep', 'RemoteDir', + f'{sfopfn}.COPIED'] + try: + output = subprocess.check_output(cmd) + remote_dir = output.decode().split()[2] + except subprocess.CalledProcessError: + outf.write(f'(closed,ondisk), error reading RemoteDir from' + f' COPIED file: {sfoid} {sfopfn}\n') + continue - remote_filename = os.path.join(remote_dir, os.path.basename(sfopfn)) - if report: reportf.write(f'marking {sfopfn} as transferred in DB\n') - if not dryrun: - oradb.Transfer(sfopfn, remote_filename) - updated += 1 - else: - # No .COPIED file: why couldn't CS transfer it? - outf.write(f'(closed,ondisk), no COPIED file: {sfoid.lower()} {sfopfn}\n') + remote_filename = os.path.join(remote_dir, os.path.basename(sfopfn)) + if report: reportf.write(f'marking {sfopfn} as transferred in DB\n') + if not dryrun: + oradb.Transfer(sfopfn, remote_filename) + updated += 1 + elif ext == '.TOBECOPIED' or ext == '.COPYING': + pass + # CastorScript was taking care of the file: nothing to do + # processing will continue once restarted + elif ext == '.data': + file_exist = True + + if not file_exist: + # no file: even if data file is absent + # this should not be possible: since in the database the file is + # not 'TRANSFERRED' it cannot be processed by T0, so it cannot + # be migrated to tape, and cannot be deleted. + outf.write(f'(closed,ondisk), no file: {sfoid} {sfopfn}\n') oradb.db.orcl.close() if report: reportf.close() diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py index f86b1bd350d6a8ab2f8800b9cfbc34c5f18bb2c3..83ac108b1cc8bbbbac3dc0f47970a5449cd1bab3 100644 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py @@ -3,6 +3,7 @@ import sys import subprocess import logging import argparse +import re sys.path.append('/sw/castor/Script') import cs.Tools.Libraries.Database as Oracle @@ -12,16 +13,14 @@ import cs.Tools.FilenameParsers.SFOFileNameParser as SFOFileNameParser # Config for cs...Database: only needs a few paramaters of the CS config class MockConfig: def __init__(self, url): - self.connection = url - self.file_table = 'SFO_TZ_FILE' - self.lb_table = 'SFO_TZ_LUMIBLOCK' - self.run_table = 'SFO_TZ_RUN' + self.DBURL = url + self.DBFileTable = 'SFO_TZ_FILE' + self.DBLBTable = 'SFO_TZ_LUMIBLOCK' + self.DBRunTable = 'SFO_TZ_RUN' def oracle_check_closed_transferred(oradb_url, run_number, outliers_file, verbose=False, dryrun=False, report=None): - del outliers_file - conf = MockConfig(oradb_url) parser = SFOFileNameParser.SFOFileNameParser dblogger = logging.getLogger() @@ -37,16 +36,27 @@ def oracle_check_closed_transferred(oradb_url, run_number, outliers_file, and TRANSFERSTATE='TRANSFERRED'""", runnr=run_number).fetchall() if verbose: print(f'Found {len(ora_state_4)} (closed,transferred)') - for [sfoid, sfopfn] in ora_state_4: - cmd = ['ssh', '-x ', f'pc-tdq-{sfoid.lower()}', 'ls', sfopfn] - lsp = subprocess.run(cmd, stderr=subprocess.PIPE) - if lsp.returncode == 2 \ - and 'No such file or directory' in lsp.stderr.decode(): - # File does not exist: mark it as deleted - if report: reportf.write(f'marking {sfopfn} as deleted in DB\n') - if not dryrun: - oradb.Deletion(sfopfn) - updated += 1 + with open(outliers_file, 'w') as outf: + for [sfoid, sfopfn] in ora_state_4: + m = re.match('sfo-([0-9]+)', sfoid, re.IGNORECASE) + if not m: + outf.write(f'sfoid does not match expected format: {sfoid}, file: sfopfn') + continue + sfonum = int(m.group(1)) + if sfonum < 10: + sfoid = f'sfo-0{sfonum}' + else: + sfoid = f'sfo-{sfonum}' + + cmd = ['ssh', '-x ', f'pc-tdq-{sfoid}', 'ls', sfopfn] + lsp = subprocess.run(cmd, stderr=subprocess.PIPE) + if lsp.returncode == 2 \ + and 'No such file or directory' in lsp.stderr.decode(): + # File does not exist: mark it as deleted + if report: reportf.write(f'marking {sfopfn} as deleted in DB\n') + if not dryrun: + oradb.Deletion(sfopfn) + updated += 1 oradb.db.orcl.close() if report: reportf.close() diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py b/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py index 7196836072e67201edf3cbce7cc51e843c546517..cdc23813653d5d35a9640e8ac6386f442aaa18bb 100644 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py @@ -83,7 +83,7 @@ def sync_files(sqc, orac, verbose, dryrun, reportf): where LFN=:lfn""", filedict) orac.connection.commit() if reportf: reportf.write(f'[file] updated: {file[0]}\n') - updated += 0 + updated += 1 if verbose: if dryrun: @@ -149,7 +149,7 @@ def sync_lbs(sqc, orac, verbose, dryrun, reportf): if reportf: reportf.write(f'[lb] updated: {lbdict["sfoid"]},' f'{lbdict["runnr"]},{lbdict["lumiblocknr"]},' f'{lbdict["streamtype"]},{lbdict["stream"]}\n') - updated += 0 + updated += 1 if verbose: if dryrun: @@ -216,7 +216,7 @@ def sync_streams(sqc, orac, verbose, dryrun, reportf): if reportf: reportf.write(f'[stream] updated: {streamdict["sfoid"]},' f'{streamdict["runnr"]},{streamdict["streamtype"]},' f'{streamdict["stream"]}\n') - updated += 0 + updated += 1 if verbose: if dryrun: diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py b/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py index 7878d3cf5de17b3ca13ba61e88833b4481f386a9..48145bb90a47c3d39f1ca9f7c1dc1d29cc90eb74 100644 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py @@ -1,12 +1,13 @@ #!/usr/bin/env python # run as atlascdr on pc-tdq-grape -import argparse import sys +import argparse import os import subprocess import time from socket import gethostname from getpass import getuser +from xml.sax.handler import DTDHandler from oracle_run_status import oracle_run_status from oracle_create_missing_and_close_opened import oracle_create_missing_and_close_opened from oracle_check_closed_ondisk import oracle_check_closed_ondisk @@ -44,9 +45,11 @@ from oracle_check_closed_transferred import oracle_check_closed_transferred # transfer: check presence of .COPIED and update as in Database.Transfer # D. Anything that does not match is logged in a file for manual processing + if not gethostname().startswith('pc-atlas-cr-30'): print('error: must be executed on pc-atlas-cr-30') sys.exit(1) + if getuser() != 'atlascdr': print('error: must be executed as atlascdr') sys.exit(1) @@ -69,13 +72,13 @@ ap.add_argument('-r', '--report', args = ap.parse_args() if args.verbose: - print(f'sfos: {args.sfos}') - print(f'run#: {args.run_number}') - print(f'oracle URL: {args.oradb_url}') - print(f'dryrun: {args.dryrun}') - print(f'verbose: {args.verbose}') - print(f'report: {args.report}') - print('----') + print('parameters:') + print(f' sfos: {args.sfos}') + print(f' run#: {args.run_number}') + print(f' oracle URL: {args.oradb_url}') + print(f' dryrun: {args.dryrun}') + print(f' verbose: {args.verbose}') + print(f' report: {args.report}') oracle_run_status(args.oradb_url, args.run_number, args.sfos, args.report) @@ -94,10 +97,12 @@ def castorscript_is_running(computer, config_file): psp = subprocess.run(ps_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return psp.stdout.decode() != '' - -if args.verbose: print('stopping CastorScript instances and collecting sqlite DBs') -if not args.dryrun: +if args.dryrun: + if args.verbose: print('would be stopping CastorScript instances') +else: + if args.verbose: print('stopping CastorScript instances') for i in args.sfos: + if i < 10: i = f'0{i}' if os.path.exists(f'/mnt/daq_area_rw/castor/pc-tdq-sfo-{i}/atlascdr/prod.cfg'): os.rename(f'/mnt/daq_area_rw/castor/pc-tdq-sfo-{i}/atlascdr/prod.cfg', f'/mnt/daq_area_rw/castor/pc-tdq-sfo-{i}/atlascdr/prod.stopped') @@ -112,7 +117,11 @@ if not args.dryrun: if args.verbose: print(f'CS stopped on sfo-{i}') for i in args.sfos: - if not os.path.exists(f'sfo-{i}.{args.run_number}.sqlite'): + if i < 10: i = f'0{i}' + if os.path.exists(f'sfo-{i}.{args.run_number}.sqlite'): + if args.verbose: + print(f'using existing sqlite db: sfo-{i}.{args.run_number}.sqlite') + else: scp_cmd = ['scp', f'pc-tdq-sfo-{i}:/dsk1/sqlite/sfo-sqlite-{args.run_number}.db', f'sfo-{i}.{args.run_number}.sqlite'] subprocess.run(scp_cmd, check=True, stdout=subprocess.PIPE) @@ -120,8 +129,11 @@ for i in args.sfos: if args.verbose: print('recovery step A: bring state 1 (missing) and 2 (opened) to 3 (closed)') for i in args.sfos: + if i < 10: i = f'0{i}' + sqlite_filename = f'sfo-{i}.{args.run_number}.sqlite' + if args.verbose: print(f'processing file: {sqlite_filename}') oracle_create_missing_and_close_opened(args.oradb_url, - f'sfo-{i}.{args.run_number}.sqlite', verbose=args.verbose, + sqlite_filename, verbose=args.verbose, dryrun=args.dryrun, report=args.report) if args.verbose: print('recovery step B: bring state 4 (closed,transferred) to' @@ -140,9 +152,12 @@ if os.path.exists(args.outliers_file) and os.path.getsize(args.outliers_file) != ' taken care of outliers') else: if args.verbose: print('no outliers') - if args.verbose: print('starting CastorScript instances') - if not args.dryrun: + if args.dryrun: + if args.verbose: print('would be starting CastorScript instances') + else: + if args.verbose: print('starting CastorScript instances') for i in args.sfos: + if i < 10: i = f'0{i}' os.rename(f'/mnt/daq_area_rw/castor/pc-tdq-sfo-{i}/atlascdr/prod.stopped', f'/mnt/daq_area_rw/castor/pc-tdq-sfo-{i}/atlascdr/prod.cfg') if args.verbose: print(f're-enabled CS on sfo-{i}') diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py b/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py index 736dea876be3d57e2ff61867a25d5ab9a4a9c152..185f1f8784212f0da5794845dc2f31b8fa243fc4 100644 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py @@ -5,6 +5,7 @@ import argparse def oracle_run_status(oradb_url, run_number, sfos, report=None): + print('run status:') user, pwd, dbn = coral_auth.get_connection_parameters_from_connection_string( oradb_url) oradb = cx_Oracle.connect(user, pwd, dbn) @@ -27,7 +28,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): elif state == 'CLOSED': closed += 1 elif state == 'TRANSFERRED': transferred += 1 else: raise RuntimeError(f'unknown stream state: {state}') - print(f'streams: {opened} opened, {closed} closed, {transferred} transferred') + print(f' streams: {opened} opened, {closed} closed, {transferred} transferred') opened = 0 closed = 0 @@ -42,7 +43,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): elif state == 'CLOSED': closed += 1 elif state == 'TRANSFERRED': transferred += 1 else: raise RuntimeError(f'unknown lb state: {state}') - print(f'lumiblocks: {opened} opened, {closed} closed, {transferred} transferred') + print(f' lumiblocks: {opened} opened, {closed} closed, {transferred} transferred') opened_ondisk = 0 closed_ondisk = 0 @@ -64,7 +65,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): deleted_transferred +=1 else: raise RuntimeError(f'unknown file state: {filestate}, {transferstate}') - print(f'files: {opened_ondisk} opened_ondisk, {closed_ondisk} closed_ondisk' + print(f' files: {opened_ondisk} opened_ondisk, {closed_ondisk} closed_ondisk' f', {closed_transferred} closed_transferred' f', {deleted_transferred} deleted_transferred')