Skip to content
Snippets Groups Projects
Commit 772b8aa9 authored by root's avatar root
Browse files

obCleanup {9} - Estimate the moment of cleanup based on the new clasAd CRAB_TaskEndTime.

parent 6b306862
No related branches found
No related tags found
1 merge request!23JobCleanup {9-11} && WMArchive {7} - second run
import pprint
import warnings
from JobCleanupAlarms import *
......@@ -23,11 +24,16 @@ class CrabDag:
raise KeyAlarm(key='CRAB_ReqName', clusterId=self.clusterId)
else:
self.name = htcDag["CRAB_ReqName"]
if not 'CRAB_TaskEndTime' in self.task.keys():
raise KeyAlarm(key='CRAB_TaskEndTime', clusterId=self.clusterId)
self.isEmpty = False
self.isComplete = False
self.isReadyToClear = False
self.isCleared = False
self.dagLifeTime = 0
self.dagExtendLifeTime = 0
def addJob(self, htcDag):
self.jobs.append(htcDag)
......@@ -42,7 +42,8 @@ import time
LOG_FILE = '/var/log/condor/JobCleanup.log'
TASK_LIFETIME = 39*24*60*60 #assuming 39 days just in case
TASK_LIFETIME = 38*24*60*60 #assuming 38 days just in case
TASK_EXTEND_LIFETIME = 8*24*60*60 #assuming 8 days after CRAB_TaskEndTime
HOSTNAME = os.uname()[1]
def getUserName():
return pwd.getpwuid( os.getuid())[0]
......@@ -97,7 +98,21 @@ def prepareDagList(schedd):
# 1. Make a condor_q
scheddAlarm = None
try:
results = schedd.query('( (JobUniverse =?= 7) && (TaskType=="ROOT") )', [ "ClusterId", "ProcId", "CRAB_UserHN", "CRAB_ReqName", "CRAB_TaskSubmitTime", "QDate", "DAGManJobId", "DAG_NodesQueued", "JobStatus", "JobUniverse", "Owner", "Iwd"])
constraint = '( (JobUniverse =?= 7) && (TaskType=="ROOT") )'
projection = ["ClusterId",
"ProcId",
"CRAB_UserHN",
"CRAB_ReqName",
"CRAB_TaskSubmitTime",
"CRAB_TaskEndTime",
"QDate",
"DAGManJobId",
"DAG_NodesQueued",
"JobStatus",
"JobUniverse",
"Owner",
"Iwd"]
results = schedd.query(constraint, projection)
except:
scheddAlarm = ScheddAlarm("")
warnings.warn(scheddAlarm, stacklevel=5)
......@@ -118,11 +133,30 @@ def prepareDagList(schedd):
del results
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
for dag in dags:
dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"])
logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime)
if dag.dagLifeTime > TASK_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
# for dag in dags:
# dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"])
# logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime)
# if dag.dagLifeTime > TASK_LIFETIME:
# # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
# expectedDagStatus = [5, 4, 3]
# if dag.task["JobStatus"] in expectedDagStatus:
# dag.isReadyToClear = True
# else:
# staleTaskAlarm = StaleTaskAlarm(dag)
# warnings.warn(staleTaskAlarm, stacklevel=5)
# return dags
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
for dag in dags:
if 'CRAB_TaskEndTime' in dag.task.keys():
dag.dagExtendLifeTime = (dag.task["ServerTime"] - dag.task["CRAB_TaskEndTime"])
else:
dag.dagExtendLifeTime = (dag.task["ServerTime"] - (dag.task["Qdate"] + (TASK_LIFETIME - TASK_EXTEND_LIFETIME)))
logging.debug("currentTaskExtendLifeTime = %d" % dag.dagExtendLifeTime)
if dag.dagExtendLifeTime >= TASK_EXTEND_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
expectedDagStatus = [5, 4, 3]
if dag.task["JobStatus"] in expectedDagStatus:
dag.isReadyToClear = True
......@@ -182,7 +216,7 @@ if __name__ == "__main__":
consoleHandler.setLevel(logging.WARNING)
rootLogger.addHandler(consoleHandler)
# Temporary redirect stderr
sys.stderr = open('/dev/null', 'w')
# sys.stderr = open('/dev/null', 'w')
logging.basicConfig(filename=LOG_FILE,level=logging.INFO,format='%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment