Skip to content
Snippets Groups Projects
Commit c7a6ab99 authored by Diego Davila Foyo's avatar Diego Davila Foyo
Browse files

Merge branch 'qa' into 'master'

JobCleanup {9-11} && WMArchive {7}

See merge request !23
parents 80149115 5da566b0
No related branches found
No related tags found
2 merge requests!62Update JobCleanup.py to fix https://github.com/dmwm/CRABServer/issues/7546,!23JobCleanup {9-11} && WMArchive {7} - second run
import pprint
import warnings import warnings
from JobCleanupAlarms import * from JobCleanupAlarms import *
...@@ -28,6 +29,10 @@ class CrabDag: ...@@ -28,6 +29,10 @@ class CrabDag:
self.isReadyToClear = False self.isReadyToClear = False
self.isCleared = False self.isCleared = False
self.dagLifeTime = 0 self.dagLifeTime = 0
self.dagExtendLifeTime = 0
def addJob(self, htcDag): def addJob(self, htcDag):
self.jobs.append(htcDag) self.jobs.append(htcDag)
def __repr__(self):
return ("CrabDag: %s\n" % pprint.pformat(self.task))
...@@ -42,7 +42,8 @@ import time ...@@ -42,7 +42,8 @@ import time
LOG_FILE = '/var/log/condor/JobCleanup.log' LOG_FILE = '/var/log/condor/JobCleanup.log'
TASK_LIFETIME = 39*24*60*60 #assuming 39 days just in case TASK_LIFETIME = 38*24*60*60 #assuming 38 days just in case
TASK_EXTEND_LIFETIME = 8*24*60*60 #assuming 8 days after CRAB_TaskEndTime
HOSTNAME = os.uname()[1] HOSTNAME = os.uname()[1]
def getUserName(): def getUserName():
return pwd.getpwuid( os.getuid())[0] return pwd.getpwuid( os.getuid())[0]
...@@ -97,7 +98,21 @@ def prepareDagList(schedd): ...@@ -97,7 +98,21 @@ def prepareDagList(schedd):
# 1. Make a condor_q # 1. Make a condor_q
scheddAlarm = None scheddAlarm = None
try: try:
results = schedd.query('( (JobUniverse =?= 7) && (TaskType=="ROOT") )', [ "ClusterId", "ProcId", "CRAB_UserHN", "CRAB_ReqName", "CRAB_TaskSubmitTime", "QDate", "DAGManJobId", "DAG_NodesQueued", "JobStatus", "JobUniverse", "Owner", "Iwd"]) constraint = '( (JobUniverse =?= 7) && (TaskType=="ROOT") )'
projection = ["ClusterId",
"ProcId",
"CRAB_UserHN",
"CRAB_ReqName",
"CRAB_TaskSubmitTime",
"CRAB_TaskEndTime",
"QDate",
"DAGManJobId",
"DAG_NodesQueued",
"JobStatus",
"JobUniverse",
"Owner",
"Iwd"]
results = schedd.query(constraint, projection)
except: except:
scheddAlarm = ScheddAlarm("") scheddAlarm = ScheddAlarm("")
warnings.warn(scheddAlarm, stacklevel=5) warnings.warn(scheddAlarm, stacklevel=5)
...@@ -113,21 +128,44 @@ def prepareDagList(schedd): ...@@ -113,21 +128,44 @@ def prepareDagList(schedd):
else: else:
dags.append(crabDag) dags.append(crabDag)
# pprint.pprint(dags) # logging.debug("List of dags: %s \n" % (pprint.pformat(dags)))
# explicitly free the memory consumed from results and avoid later refferences # explicitly free the memory consumed from results and avoid later refferences
del results del results
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime # 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
for dag in dags: # for dag in dags:
dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"]) # dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"])
logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime) # logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime)
if dag.dagLifeTime > TASK_LIFETIME: # if dag.dagLifeTime > TASK_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action # # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
# expectedDagStatus = [5, 4, 3]
# if dag.task["JobStatus"] in expectedDagStatus:
# dag.isReadyToClear = True
# else:
# staleTaskAlarm = StaleTaskAlarm(dag)
# warnings.warn(staleTaskAlarm, stacklevel=5)
# return dags
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
for dag in dags:
if 'CRAB_TaskEndTime' in dag.task.keys():
dag.dagExtendLifeTime = (dag.task["ServerTime"] - dag.task["CRAB_TaskEndTime"])
else:
dag.dagExtendLifeTime = (dag.task["ServerTime"] - (dag.task["Qdate"] + (TASK_LIFETIME - TASK_EXTEND_LIFETIME)))
logging.info("ClusterId: %s: Missing kew 'CRAB_TaskEndTime': Approximate 'currentTaskExtendLifeTime'; Crab_ReqName: %s; Qdate: %s." % (dag.task["ClusterId"], dag.name, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"])) ))
logging.debug("ClusterId: %s: currentTaskExtendLifeTime = %d days; Qdate: %s." % (dag.clusterId, (dag.dagExtendLifeTime//60//60//24), time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"]))))
if dag.dagExtendLifeTime >= TASK_EXTEND_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
expectedDagStatus = [5, 4, 3] expectedDagStatus = [5, 4, 3]
if dag.task["JobStatus"] in expectedDagStatus: if dag.task["JobStatus"] in expectedDagStatus:
dag.isReadyToClear = True dag.isReadyToClear = True
logging.debug("ClusterId: %s: Added for deletion." % dag.clusterId)
else: else:
staleTaskAlarm = StaleTaskAlarm(dag) staleTaskAlarm = StaleTaskAlarm(dag)
logging.debug("ClusterId: %s: Not Added for deletion." % dag.clusterId)
warnings.warn(staleTaskAlarm, stacklevel=5) warnings.warn(staleTaskAlarm, stacklevel=5)
return dags return dags
...@@ -168,7 +206,7 @@ if __name__ == "__main__": ...@@ -168,7 +206,7 @@ if __name__ == "__main__":
logFormatter = logging.Formatter('%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s') logFormatter = logging.Formatter('%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s')
rootLogger = logging.getLogger() rootLogger = logging.getLogger()
rootLogger.setLevel(logging.INFO) rootLogger.setLevel(logging.DEBUG)
# Setting different loglevels for file logging and consoleoutput # Setting different loglevels for file logging and consoleoutput
fileHandler = logging.FileHandler(LOG_FILE) fileHandler = logging.FileHandler(LOG_FILE)
......
...@@ -21,7 +21,7 @@ class StaleTaskAlarm(Alarm): ...@@ -21,7 +21,7 @@ class StaleTaskAlarm(Alarm):
""" """
def __init__(self, dag): def __init__(self, dag):
self.dag = dag self.dag = dag
self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s." % (self.dag.clusterId, self.dag.name)) self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s. Task['JobStatus']: %s" % (self.dag.clusterId, self.dag.name, self.dag.task["JobStatus"]))
logging.info(self.alarmMessage) logging.info(self.alarmMessage)
# print("StaleTaskAlarm: %s" % self.alarmMessage) # print("StaleTaskAlarm: %s" % self.alarmMessage)
......
...@@ -13,9 +13,10 @@ import logging ...@@ -13,9 +13,10 @@ import logging
from httplib import HTTPException from httplib import HTTPException
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from WMCore.WMException import WMException
from WMCore.Services.WMArchive.WMArchive import WMArchive from WMCore.Services.WMArchive.WMArchive import WMArchive
#sudo -u condor nohup sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py' > /dev/null 2> /dev/null #sudo -u condor sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMCore/src/python; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py'
class Daemon(object): class Daemon(object):
""" """
...@@ -241,10 +242,10 @@ class WMArchiveUploader(Daemon): ...@@ -241,10 +242,10 @@ class WMArchiveUploader(Daemon):
for error in step["errors"]: for error in step["errors"]:
error.update({"details" : str(error["details"])}) error.update({"details" : str(error["details"])})
docs.append(tmpdoc) docs.append(tmpdoc)
try: try:
response = wmarchiver.archiveData(docs) response = wmarchiver.archiveData(docs)
except (pycurl.error, HTTPException) as e: except (pycurl.error, HTTPException, WMException) as e:
logger.error("Error uploading docs: %s" % e) logger.error("Error uploading docs: %s" % e)
time.sleep(60) time.sleep(60)
continue continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment