diff --git a/CrabDag.py b/CrabDag.py index 591d48c0a23c36b4077ed71ccc873c6d8634f7f0..74d6fbcc6c9259b6773b616c7f101c187a4ea340 100644 --- a/CrabDag.py +++ b/CrabDag.py @@ -1,3 +1,4 @@ +import pprint import warnings from JobCleanupAlarms import * @@ -28,6 +29,10 @@ class CrabDag: self.isReadyToClear = False self.isCleared = False self.dagLifeTime = 0 + self.dagExtendLifeTime = 0 def addJob(self, htcDag): self.jobs.append(htcDag) + + def __repr__(self): + return ("CrabDag: %s\n" % pprint.pformat(self.task)) diff --git a/JobCleanup.py b/JobCleanup.py index a475f59faef2c31a42788daa54fc24dcd662f546..9b4285febf6f45055eaadb2d21a4367947991431 100755 --- a/JobCleanup.py +++ b/JobCleanup.py @@ -42,7 +42,8 @@ import time LOG_FILE = '/var/log/condor/JobCleanup.log' -TASK_LIFETIME = 39*24*60*60 #assuming 39 days just in case +TASK_LIFETIME = 38*24*60*60 #assuming 38 days just in case +TASK_EXTEND_LIFETIME = 8*24*60*60 #assuming 8 days after CRAB_TaskEndTime HOSTNAME = os.uname()[1] def getUserName(): return pwd.getpwuid( os.getuid())[0] @@ -97,7 +98,21 @@ def prepareDagList(schedd): # 1. Make a condor_q scheddAlarm = None try: - results = schedd.query('( (JobUniverse =?= 7) && (TaskType=="ROOT") )', [ "ClusterId", "ProcId", "CRAB_UserHN", "CRAB_ReqName", "CRAB_TaskSubmitTime", "QDate", "DAGManJobId", "DAG_NodesQueued", "JobStatus", "JobUniverse", "Owner", "Iwd"]) + constraint = '( (JobUniverse =?= 7) && (TaskType=="ROOT") )' + projection = ["ClusterId", + "ProcId", + "CRAB_UserHN", + "CRAB_ReqName", + "CRAB_TaskSubmitTime", + "CRAB_TaskEndTime", + "QDate", + "DAGManJobId", + "DAG_NodesQueued", + "JobStatus", + "JobUniverse", + "Owner", + "Iwd"] + results = schedd.query(constraint, projection) except: scheddAlarm = ScheddAlarm("") warnings.warn(scheddAlarm, stacklevel=5) @@ -113,21 +128,44 @@ def prepareDagList(schedd): else: dags.append(crabDag) - # pprint.pprint(dags) + # logging.debug("List of dags: %s \n" % (pprint.pformat(dags))) # explicitly free the memory consumed from results and avoid later refferences del results # 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime - for dag in dags: - dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"]) - logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime) - if dag.dagLifeTime > TASK_LIFETIME: - # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action + # for dag in dags: + # dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"]) + # logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime) + # if dag.dagLifeTime > TASK_LIFETIME: + # # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action + # expectedDagStatus = [5, 4, 3] + # if dag.task["JobStatus"] in expectedDagStatus: + # dag.isReadyToClear = True + # else: + # staleTaskAlarm = StaleTaskAlarm(dag) + # warnings.warn(staleTaskAlarm, stacklevel=5) + # return dags + + # 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime + for dag in dags: + if 'CRAB_TaskEndTime' in dag.task.keys(): + dag.dagExtendLifeTime = (dag.task["ServerTime"] - dag.task["CRAB_TaskEndTime"]) + + else: + dag.dagExtendLifeTime = (dag.task["ServerTime"] - (dag.task["Qdate"] + (TASK_LIFETIME - TASK_EXTEND_LIFETIME))) + logging.info("ClusterId: %s: Missing kew 'CRAB_TaskEndTime': Approximate 'currentTaskExtendLifeTime'; Crab_ReqName: %s; Qdate: %s." % (dag.task["ClusterId"], dag.name, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"])) )) + + logging.debug("ClusterId: %s: currentTaskExtendLifeTime = %d days; Qdate: %s." % (dag.clusterId, (dag.dagExtendLifeTime//60//60//24), time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"])))) + + if dag.dagExtendLifeTime >= TASK_EXTEND_LIFETIME: + # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action expectedDagStatus = [5, 4, 3] if dag.task["JobStatus"] in expectedDagStatus: dag.isReadyToClear = True + logging.debug("ClusterId: %s: Added for deletion." % dag.clusterId) else: staleTaskAlarm = StaleTaskAlarm(dag) + logging.debug("ClusterId: %s: Not Added for deletion." % dag.clusterId) warnings.warn(staleTaskAlarm, stacklevel=5) return dags @@ -168,7 +206,7 @@ if __name__ == "__main__": logFormatter = logging.Formatter('%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s') rootLogger = logging.getLogger() - rootLogger.setLevel(logging.INFO) + rootLogger.setLevel(logging.DEBUG) # Setting different loglevels for file logging and consoleoutput fileHandler = logging.FileHandler(LOG_FILE) diff --git a/JobCleanupAlarms.py b/JobCleanupAlarms.py index 5593953e127196635c10f80f7959fca6a7dd1d03..a1d46565f15e4cf3f0a4b47f12623a62e6f2f515 100644 --- a/JobCleanupAlarms.py +++ b/JobCleanupAlarms.py @@ -21,7 +21,7 @@ class StaleTaskAlarm(Alarm): """ def __init__(self, dag): self.dag = dag - self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s." % (self.dag.clusterId, self.dag.name)) + self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s. Task['JobStatus']: %s" % (self.dag.clusterId, self.dag.name, self.dag.task["JobStatus"])) logging.info(self.alarmMessage) # print("StaleTaskAlarm: %s" % self.alarmMessage) diff --git a/WMArchiveUploader.py b/WMArchiveUploader.py index cd9775f96563945db619c237e5dd5e574f08ba1a..de47f7837edd36873e02a6cb6434b5de8980fd8b 100755 --- a/WMArchiveUploader.py +++ b/WMArchiveUploader.py @@ -13,9 +13,10 @@ import logging from httplib import HTTPException from logging.handlers import TimedRotatingFileHandler +from WMCore.WMException import WMException from WMCore.Services.WMArchive.WMArchive import WMArchive -#sudo -u condor nohup sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py' > /dev/null 2> /dev/null +#sudo -u condor sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMCore/src/python; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py' class Daemon(object): """ @@ -241,10 +242,10 @@ class WMArchiveUploader(Daemon): for error in step["errors"]: error.update({"details" : str(error["details"])}) docs.append(tmpdoc) - + try: response = wmarchiver.archiveData(docs) - except (pycurl.error, HTTPException) as e: + except (pycurl.error, HTTPException, WMException) as e: logger.error("Error uploading docs: %s" % e) time.sleep(60) continue