Skip to content
Snippets Groups Projects
Commit b5e133d1 authored by Diego Davila Foyo's avatar Diego Davila Foyo
Browse files

Revert "Merge branch 'qa' into 'master'"

This reverts merge request !23
parent c7a6ab99
No related tags found
No related merge requests found
import pprint
import warnings
from JobCleanupAlarms import *
......@@ -29,10 +28,6 @@ class CrabDag:
self.isReadyToClear = False
self.isCleared = False
self.dagLifeTime = 0
self.dagExtendLifeTime = 0
def addJob(self, htcDag):
self.jobs.append(htcDag)
def __repr__(self):
return ("CrabDag: %s\n" % pprint.pformat(self.task))
......@@ -42,8 +42,7 @@ import time
LOG_FILE = '/var/log/condor/JobCleanup.log'
TASK_LIFETIME = 38*24*60*60 #assuming 38 days just in case
TASK_EXTEND_LIFETIME = 8*24*60*60 #assuming 8 days after CRAB_TaskEndTime
TASK_LIFETIME = 39*24*60*60 #assuming 39 days just in case
HOSTNAME = os.uname()[1]
def getUserName():
return pwd.getpwuid( os.getuid())[0]
......@@ -98,21 +97,7 @@ def prepareDagList(schedd):
# 1. Make a condor_q
scheddAlarm = None
try:
constraint = '( (JobUniverse =?= 7) && (TaskType=="ROOT") )'
projection = ["ClusterId",
"ProcId",
"CRAB_UserHN",
"CRAB_ReqName",
"CRAB_TaskSubmitTime",
"CRAB_TaskEndTime",
"QDate",
"DAGManJobId",
"DAG_NodesQueued",
"JobStatus",
"JobUniverse",
"Owner",
"Iwd"]
results = schedd.query(constraint, projection)
results = schedd.query('( (JobUniverse =?= 7) && (TaskType=="ROOT") )', [ "ClusterId", "ProcId", "CRAB_UserHN", "CRAB_ReqName", "CRAB_TaskSubmitTime", "QDate", "DAGManJobId", "DAG_NodesQueued", "JobStatus", "JobUniverse", "Owner", "Iwd"])
except:
scheddAlarm = ScheddAlarm("")
warnings.warn(scheddAlarm, stacklevel=5)
......@@ -128,44 +113,21 @@ def prepareDagList(schedd):
else:
dags.append(crabDag)
# logging.debug("List of dags: %s \n" % (pprint.pformat(dags)))
# pprint.pprint(dags)
# explicitly free the memory consumed from results and avoid later refferences
del results
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
# for dag in dags:
# dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"])
# logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime)
# if dag.dagLifeTime > TASK_LIFETIME:
# # done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
# expectedDagStatus = [5, 4, 3]
# if dag.task["JobStatus"] in expectedDagStatus:
# dag.isReadyToClear = True
# else:
# staleTaskAlarm = StaleTaskAlarm(dag)
# warnings.warn(staleTaskAlarm, stacklevel=5)
# return dags
# 2. Find all the dags ... - using Qdate here because some very old tasks do miss the classad CRAB_TaskSubmitTime
for dag in dags:
if 'CRAB_TaskEndTime' in dag.task.keys():
dag.dagExtendLifeTime = (dag.task["ServerTime"] - dag.task["CRAB_TaskEndTime"])
else:
dag.dagExtendLifeTime = (dag.task["ServerTime"] - (dag.task["Qdate"] + (TASK_LIFETIME - TASK_EXTEND_LIFETIME)))
logging.info("ClusterId: %s: Missing kew 'CRAB_TaskEndTime': Approximate 'currentTaskExtendLifeTime'; Crab_ReqName: %s; Qdate: %s." % (dag.task["ClusterId"], dag.name, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"])) ))
logging.debug("ClusterId: %s: currentTaskExtendLifeTime = %d days; Qdate: %s." % (dag.clusterId, (dag.dagExtendLifeTime//60//60//24), time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dag.task["Qdate"]))))
if dag.dagExtendLifeTime >= TASK_EXTEND_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
for dag in dags:
dag.dagLifeTime = (dag.task["ServerTime"] - dag.task["Qdate"])
logging.debug("currentTaskLifeTime = %d" % dag.dagLifeTime)
if dag.dagLifeTime > TASK_LIFETIME:
# done - to check if the dag status is in hold(5)||completed(4)||removed(3) - before take any action
expectedDagStatus = [5, 4, 3]
if dag.task["JobStatus"] in expectedDagStatus:
dag.isReadyToClear = True
logging.debug("ClusterId: %s: Added for deletion." % dag.clusterId)
else:
staleTaskAlarm = StaleTaskAlarm(dag)
logging.debug("ClusterId: %s: Not Added for deletion." % dag.clusterId)
warnings.warn(staleTaskAlarm, stacklevel=5)
return dags
......@@ -206,7 +168,7 @@ if __name__ == "__main__":
logFormatter = logging.Formatter('%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s')
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.DEBUG)
rootLogger.setLevel(logging.INFO)
# Setting different loglevels for file logging and consoleoutput
fileHandler = logging.FileHandler(LOG_FILE)
......
......@@ -21,7 +21,7 @@ class StaleTaskAlarm(Alarm):
"""
def __init__(self, dag):
self.dag = dag
self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s. Task['JobStatus']: %s" % (self.dag.clusterId, self.dag.name, self.dag.task["JobStatus"]))
self.alarmMessage = ("StaleTaskAlarm: A Task in non final state found. Task clusterId: %s Task name: %s." % (self.dag.clusterId, self.dag.name))
logging.info(self.alarmMessage)
# print("StaleTaskAlarm: %s" % self.alarmMessage)
......
......@@ -13,10 +13,9 @@ import logging
from httplib import HTTPException
from logging.handlers import TimedRotatingFileHandler
from WMCore.WMException import WMException
from WMCore.Services.WMArchive.WMArchive import WMArchive
#sudo -u condor sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMCore/src/python; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py'
#sudo -u condor nohup sh -c 'export LD_LIBRARY_PATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; export PYTHONPATH=/data/srv/SubmissionInfrastructureScripts/WMArchiveWD/; python /data/srv/SubmissionInfrastructureScripts/WMArchiveUploaderNew.py' > /dev/null 2> /dev/null
class Daemon(object):
"""
......@@ -242,10 +241,10 @@ class WMArchiveUploader(Daemon):
for error in step["errors"]:
error.update({"details" : str(error["details"])})
docs.append(tmpdoc)
try:
response = wmarchiver.archiveData(docs)
except (pycurl.error, HTTPException, WMException) as e:
except (pycurl.error, HTTPException) as e:
logger.error("Error uploading docs: %s" % e)
time.sleep(60)
continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment