diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py old mode 100644 new mode 100755 index c24eb7f653458c3415897775e5fcbb76a9d871c8..436ce8b681486aec18051e31c443879c06e69058 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_ondisk.py @@ -30,7 +30,9 @@ def oracle_check_closed_ondisk(oradb_url, run_number, outliers_file, oradb = Oracle.Database(conf, dblogger, parser) updated = 0 - if report: reportf = open(report, 'a') + if report: + reportf = open(report, 'a') + reportf.write('---- step B\n') ora_state_3 = oradb.db.curs.execute("""select SFOID, SFOPFN from SFO_TZ_FILE where RUNNR=:runnr and FILESTATE='CLOSED' and TRANSFERSTATE='ONDISK' @@ -89,7 +91,9 @@ def oracle_check_closed_ondisk(oradb_url, run_number, outliers_file, outf.write(f'(closed,ondisk), no file: {sfoid} {sfopfn}\n') oradb.db.orcl.close() - if report: reportf.close() + if report: + reportf.write('---- end of step B\n') + reportf.close() if verbose: if dryrun: print(f'would have marked {updated} as transferred') diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py old mode 100644 new mode 100755 index 44d5f4cf4f73b48d69e5bc699ed9f30507c2dc60..8aa54f67c5195ad0985eb7b1e19b373c3dc2099a --- a/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_check_closed_transferred.py @@ -29,7 +29,9 @@ def oracle_check_closed_transferred(oradb_url, run_number, outliers_file, oradb = Oracle.Database(conf, dblogger, parser) updated = 0 - if report: reportf = open(report, 'a') + if report: + reportf = open(report, 'a') + reportf.write('---- step C\n') ora_state_4 = oradb.db.curs.execute("""select SFOID, SFOPFN from SFO_TZ_FILE where RUNNR=:runnr and FILESTATE='CLOSED' and SFOID like 'SFO%' @@ -59,7 +61,9 @@ def oracle_check_closed_transferred(oradb_url, run_number, outliers_file, updated += 1 oradb.db.orcl.close() - if report: reportf.close() + if report: + reportf.write('---- end of step C\n') + reportf.close() if verbose: if dryrun: print(f'would have marked {updated} as deleted') diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py b/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py old mode 100644 new mode 100755 index cdc23813653d5d35a9640e8ac6386f442aaa18bb..5c7d8627bfc74410dff1739f2531b3f9c00cf4e3 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_create_missing_and_close_opened.py @@ -137,7 +137,8 @@ def sync_lbs(sqc, orac, verbose, dryrun, reportf): f'{lbdict["runnr"]},{lbdict["lumiblocknr"]},' f'{lbdict["streamtype"]},{lbdict["stream"]}\n') inserted += 1 - elif oralb[0] != lbdict['state']: + elif oralb[0] != 'TRANSFERRED' and oralb[0] != lbdict['state']: + # note: do not overwrite TRANSFERRED objects; sqlite db never contain transferred objects if not dryrun: lbdict['modification_time'] = datetime.datetime.now() orac.execute("""update SFO_TZ_LUMIBLOCK set STATE=:state, @@ -203,7 +204,8 @@ def sync_streams(sqc, orac, verbose, dryrun, reportf): f'{streamdict["runnr"]},' f'{streamdict["streamtype"]},{streamdict["stream"]}\n') inserted += 1 - elif orastream[0] != streamdict['state']: + elif orastream[0] != 'TRANSFERRED' and orastream[0] != streamdict['state']: + # note: do not overwrite TRANSFERRED objects; sqlite db never contain transferred objects if not dryrun: streamdict['modification_time'] = datetime.datetime.now() orac.execute("""update SFO_TZ_RUN set STATE=:state, @@ -243,11 +245,18 @@ def oracle_create_missing_and_close_opened(oradb_url, sqlite_file, sqdb = sqlite3.connect(sqlite_file) sqc = sqdb.cursor() + reportf = None + if report: + reportf = open(report, 'a') + reportf.write('---- step A\n') + sync_files(sqc, orac, verbose, dryrun, reportf) sync_lbs(sqc, orac, verbose, dryrun, reportf) sync_streams(sqc, orac, verbose, dryrun, reportf) - if report: reportf.close() + if report: + reportf.write('---- end of step A\n') + reportf.close() sqdb.close() oradb.close() diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py b/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py old mode 100644 new mode 100755 index d9ed9f060705c17e868185835e9addf6c6854f5e..601a606e0b56fd97a0d4d5588c26c50c4cc800b5 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_db_disaster_recovery.py @@ -36,7 +36,7 @@ from oracle_check_closed_transferred import oracle_check_closed_transferred # A. State 1 and 2 are in sqlite DB in state 3: create them directly in Oracle # from sqlite data # B. Do step 3 first. Step 2 creates (ondisk, transferred) files (state -# 4) that cannot be deleted (we juste checked that .COPIED exists). So, to +# 4) that cannot be deleted (we just checked that .COPIED exists). So, to # to avoid, checking these files for absence, we first do step 3, then step 2. # Check (ondisk,transferred): if file is absent, update as in # Database.Deletion @@ -79,8 +79,6 @@ if args.verbose: print(f' verbose: {args.verbose}') print(f' report: {args.report}') -oracle_run_status(args.oradb_url, args.run_number, args.sfos, args.report) - working_dir = str(args.run_number) try: os.mkdir(working_dir) @@ -89,6 +87,7 @@ except FileExistsError: if args.verbose: print(f'using existing directory: {working_dir}') os.chdir(working_dir) +oracle_run_status(args.oradb_url, args.run_number, args.sfos, args.report) def castorscript_is_running(computer, config_file): ps_cmd = ['ssh', '-x', computer, diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py b/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py old mode 100644 new mode 100755 index 185f1f8784212f0da5794845dc2f31b8fa243fc4..9e03b7edd19b5ec50563b926efbd01450b868214 --- a/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_run_status.py @@ -4,16 +4,102 @@ import coral_auth import argparse -def oracle_run_status(oradb_url, run_number, sfos, report=None): +def _stream_status_fast(orac, run_number, sfoids): + """ + orac: oracle cursor of an open connection to a oracle db + run_number: ID of the run to filter objects + sfoids: list of SFO application name to filter objects, cannot be empty + """ + if len(sfoids) == 0: raise RuntimeError('empty SFO app list') + + req = ("select count(*) from SFO_TZ_RUN where RUNNR=:runnr" + f" and STATE=:state and (SFOID='{sfoids[0]}'") + for sfo in sfoids[1:]: + req += f" or SFOID='{sfo}'" + req += ")" + + opened = orac.execute(req, runnr=run_number, state='OPENED').fetchone()[0] + closed = orac.execute(req, runnr=run_number, state='CLOSED').fetchone()[0] + transferred = orac.execute(req, runnr=run_number, state='TRANSFERRED').fetchone()[0] + print(f' streams: {opened} opened, {closed} closed, {transferred} transferred') + + +def _lb_status_fast(orac, run_number, sfoids): + """ + orac: oracle cursor of an open connection to a oracle db + run_number: ID of the run to filter objects + sfoids: list of SFO application name to filter objects, cannot be empty + """ + if len(sfoids) == 0: raise RuntimeError('empty SFO app list') + + req = ("select count(*) from SFO_TZ_LUMIBLOCK where RUNNR=:runnr" + f" and STATE=:state and (SFOID='{sfoids[0]}'") + for sfo in sfoids[1:]: + req += f" or SFOID='{sfo}'" + req += ")" + + opened = orac.execute(req, runnr=run_number, state='OPENED').fetchone()[0] + closed = orac.execute(req, runnr=run_number, state='CLOSED').fetchone()[0] + transferred = orac.execute(req, runnr=run_number, state='TRANSFERRED').fetchone()[0] + print(f' lumiblocks: {opened} opened, {closed} closed, {transferred} transferred') + + +def _file_status_fast(orac, run_number, sfoids): + """ + orac: oracle cursor of an open connection to a oracle db + run_number: ID of the run to filter objects + sfoids: list of SFO application name to filter objects, cannot be empty + """ + if len(sfoids) == 0: raise RuntimeError('empty SFO app list') + + req = ("select count(*) from SFO_TZ_FILE where RUNNR=:runnr" + " and FILESTATE=:fstate and TRANSFERSTATE=:tstate" + f" and (SFOID='{sfoids[0]}'") + for sfo in sfoids[1:]: + req += f" or SFOID='{sfo}'" + req += ")" + + opened_ondisk = orac.execute(req, runnr=run_number, fstate='OPENED', + tstate='ONDISK').fetchone()[0] + closed_ondisk = orac.execute(req, runnr=run_number, fstate='CLOSED', + tstate='ONDISK').fetchone()[0] + closed_transferred = orac.execute(req, runnr=run_number, fstate='CLOSED', + tstate='TRANSFERRED').fetchone()[0] + deleted_transferred = orac.execute(req, runnr=run_number, fstate='DELETED', + tstate='TRANSFERRED').fetchone()[0] + print(f' files: {opened_ondisk} opened_ondisk, {closed_ondisk} closed_ondisk' + f', {closed_transferred} closed_transferred' + f', {deleted_transferred} deleted_transferred') + + +def oracle_run_status_fast(oradb_url, run_number, sfos): + print('run status:') + user, pwd, dbn = coral_auth.get_connection_parameters_from_connection_string( + oradb_url) + oradb = cx_Oracle.connect(user, pwd, dbn) + orac = oradb.cursor() + + sfoids = [f'SFO-{i}' for i in sfos] + + _stream_status_fast(orac, run_number, sfoids) + _lb_status_fast(orac, run_number, sfoids) + _file_status_fast(orac, run_number, sfoids) + + oradb.close() + + +def oracle_run_status_report(oradb_url, run_number, sfos, report): print('run status:') user, pwd, dbn = coral_auth.get_connection_parameters_from_connection_string( oradb_url) oradb = cx_Oracle.connect(user, pwd, dbn) orac = oradb.cursor() - sfoids = [f'sfo-{i}' for i in sfos] + sfoids = [f'SFO-{i}' for i in sfos] - if report: reportf = open(report, 'a') + if report: + reportf = open(report, 'a') + reportf.write('---- status\n') opened = 0 closed = 0 @@ -22,7 +108,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): from SFO_TZ_RUN where RUNNR=:runnr and SFOID like 'SFO%'""", runnr=run_number).fetchall() for [sfoid, streamtype, stream, state] in stream_state: - if sfoid.lower() in sfoids: + if sfoid in sfoids: if report: reportf.write(f'{sfoid}, {streamtype}_{stream}: {state}\n') if state == 'OPENED': opened += 1 elif state == 'CLOSED': closed += 1 @@ -37,7 +123,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): from SFO_TZ_LUMIBLOCK where RUNNR=:runnr and SFOID like 'SFO%'""", runnr=run_number).fetchall() for [sfoid, streamtype, stream, lb, state] in lb_state: - if sfoid.lower() in sfoids: + if sfoid in sfoids: if report: reportf.write(f'{sfoid}, {streamtype}_{stream}, {lb}: {state}\n') if state == 'OPENED': opened += 1 elif state == 'CLOSED': closed += 1 @@ -53,7 +139,7 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): from SFO_TZ_FILE where RUNNR=:runnr and SFOID like 'SFO%'""", runnr=run_number).fetchall() for [sfoid, lfn, filestate, transferstate] in file_state: - if sfoid.lower() in sfoids: + if sfoid in sfoids: if report: reportf.write(f'{lfn}: {filestate}, {transferstate}\n') if filestate == 'OPENED' and transferstate == 'ONDISK': opened_ondisk += 1 @@ -70,7 +156,16 @@ def oracle_run_status(oradb_url, run_number, sfos, report=None): f', {deleted_transferred} deleted_transferred') oradb.close() - if report: reportf.close() + if report: + reportf.write('---- end of status\n') + reportf.close() + + +def oracle_run_status(oradb_url, run_number, sfos, report=None): + if report: + oracle_run_status_report(oradb_url, run_number, sfos, report) + else: + oracle_run_status_fast(oradb_url, run_number, sfos) if __name__ == '__main__': diff --git a/ProductionTools/oracle_db_disaster_recovery/oracle_sync_transferred_files_to_lbs_and_streams.py b/ProductionTools/oracle_db_disaster_recovery/oracle_sync_transferred_files_to_lbs_and_streams.py new file mode 100755 index 0000000000000000000000000000000000000000..aeca8d01273c52921111fc3b0cf74113b25b68b7 --- /dev/null +++ b/ProductionTools/oracle_db_disaster_recovery/oracle_sync_transferred_files_to_lbs_and_streams.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +import sys +import subprocess +import logging +import argparse +import re + +sys.path.append('/sw/castor/Script') +import cs.Tools.Libraries.Database as Oracle +import cs.Tools.FilenameParsers.SFOFileNameParser as SFOFileNameParser + + +# Config for cs...Database: only needs a few paramaters of the CS config +class MockConfig: + def __init__(self, url): + self.DBURL = url + self.DBFileTable = 'SFO_TZ_FILE' + self.DBLBTable = 'SFO_TZ_LUMIBLOCK' + self.DBRunTable = 'SFO_TZ_RUN' + + +# Mocking a SFOFileNameParser object for cs...Database.notTransFiles/notTransLBs +class MockParsed: + def __init__(self, runnr, sfoid, lbnr, streamtype, streamname): + self.runnr = runnr + self.sfoid = sfoid + self.lbnr = lbnr + self.streamtype = streamtype + self.streamname = streamname + + def AppId(self): return self.sfoid + def RunNr(self): return self.runnr + def LBNr(self): return self.lbnr + def StreamType(self): return self.streamtype + def StreamName(self): return self.streamname + + +def oracle_sync_transferred_files_to_lbs_and_streams(oradb_url, + run_number, outliers_file, verbose=False, dryrun=False, report=None): + del outliers_file + conf = MockConfig(oradb_url) + parser = SFOFileNameParser.SFOFileNameParser + dblogger = logging.getLogger() + dblogger.setLevel(logging.DEBUG) + dblogger.addHandler(logging.StreamHandler(sys.stdout)) + oradb = Oracle.Database(conf, dblogger, parser) + + reportf = None + if report: + reportf = open(report, 'a') + reportf.write('---- marking LBs and streams transferred if possible\n') + + updated_lbs = 0 + non_transferred_lbs = oradb.db.curs.execute("""select SFOID, LUMIBLOCKNR, + STREAMTYPE, STREAM, STATE from SFO_TZ_LUMIBLOCK + where RUNNR=:runnr and STATE!='TRANSFERRED' and SFOID like 'SFO%' + """, runnr=run_number).fetchall() + if verbose: print(f'Found {len(non_transferred_lbs)} non-transferred LBs') + + ilb = 0 + for [sfoid, lbnr, streamtype, streamname, state] in non_transferred_lbs: + ilb += 1 + if verbose: print(f'checking LB {ilb}/{len(non_transferred_lbs)}') + if state != 'CLOSED': + msg = (f'cannot update LB ({run_number},{sfoid},{lbnr},' + f'{streamtype},{streamname}): state is not CLOSED') + if verbose: print(msg) + if reportf: reportf.write('{msg}\n') + continue + parsed = MockParsed(run_number, sfoid, lbnr, streamtype, streamname) + if oradb.notTransFiles(parsed) == 0: + updated_lbs += 1 + if not dryrun: + oradb.lbtransfer(parsed) + if reportf: reportf.write(f'marked LB({run_number},{sfoid},' + f'{lbnr},{streamtype},{streamname}) transferred\n') + else: + if reportf: reportf.write(f'would have marked LB({run_number},' + f'{sfoid},{lbnr},{streamtype},{streamname}) transferred\n') + + updated_streams = 0 + non_transferred_streams = oradb.db.curs.execute("""select SFOID, STREAMTYPE, + STREAM, STATE from SFO_TZ_RUN + where RUNNR=:runnr and STATE!='TRANSFERRED' and SFOID like 'SFO%' + """, runnr=run_number).fetchall() + if verbose: print(f'Found {len(non_transferred_streams)} non-transferred streams') + + istream = 0 + for [sfoid, streamtype, streamname, state] in non_transferred_streams: + istream += 1 + if verbose: print(f'checking stream {istream}/{len(non_transferred_streams)}') + if state != 'CLOSED': + msg = (f'cannot update stream ({run_number},{sfoid},' + f'{streamtype},{streamname}): state is not CLOSED') + if verbose: print(msg) + if reportf: reportf.write('{msg}\n') + continue + parsed = MockParsed(run_number, sfoid, None, streamtype, streamname) + if oradb.notTransLBs(parsed) == 0: + updated_streams += 1 + if not dryrun: + oradb.runtransfer(parsed) + if reportf: reportf.write(f'marked stream({run_number},{sfoid},' + f'{streamtype},{streamname}) transferred\n') + else: + if reportf: reportf.write(f'would have marked stream({run_number},' + f'{sfoid},{streamtype},{streamname}) transferred\n') + + + oradb.db.orcl.close() + if report: + reportf.write('---- end of marking LBs and streams transferred\n') + reportf.close() + if verbose: + if dryrun: + print(f'would have marked {updated_lbs} LBs as transferred') + print(f'would have marked {updated_streams} streams as transferred') + else: + print(f'marked {updated_lbs} LBs as transferred') + print(f'marked {updated_streams} streams as transferred') + + +if __name__ == '__main__': + ap = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + ap.add_argument("oradb_url", + help="URL of the oracle DB to fix, example: oracle://int8r/ATLAS_SFO_T0") + ap.add_argument("run_number", type=int, + help="number of the run to check") + ap.add_argument("outliers_file", + help="file to print out outliers") + ap.add_argument("-v", "--verbose", action="store_true", + help="enable verbosity") + ap.add_argument("-d", "--dryrun", action="store_true", + help="do not modify oracle DB") + ap.add_argument("-r", "--report", + help="file-by-file report") + args = ap.parse_args() + + oracle_sync_transferred_files_to_lbs_and_streams(args.oradb_url, + args.run_number, args.outliers_file, verbose=args.verbose, + dryrun=args.dryrun, report=args.report)