From 890d9fa6925c23f5f0daf41cb92063d3f62c7d09 Mon Sep 17 00:00:00 2001 From: Andreas Battaglia <Andreas.Battaglia@cern.ch> Date: Wed, 2 Jul 2008 17:07:14 +0000 Subject: [PATCH] Added logger to the database Connection string and table names are now configurable --- Script/Conf.py | 24 +++++++- Script/CopyThread.py | 38 ++++++------ Script/Database.py | 125 ++++++++++++++++++++++++++++++---------- Script/DeleteThread.py | 3 + Script/ManagerThread.py | 6 +- Script/main.py | 8 ++- 6 files changed, 146 insertions(+), 58 deletions(-) diff --git a/Script/Conf.py b/Script/Conf.py index eb0cd39..b992935 100755 --- a/Script/Conf.py +++ b/Script/Conf.py @@ -6,7 +6,7 @@ class Conf: - ########## Common parameters ##### + ########## Common parameters ########## # Log directory self.LogDir = '/afs/cern.ch/user/b/battagli/public/Python/DAQ/DataFlow/CastorScript/Script' @@ -16,6 +16,26 @@ class Conf: + ########## METADATA DATABASE ########## + + # Oracle connection string + self.connection = 'ATLAS_SFO_T0_W/changemenow65234@atonr_conf' + #self.connection = 'pippo' + + # File table name + self.file_table = "TEST_SCRIPT_FILE" + #self.file_table="SFO_TZ_FILE" + + # Lumiblock table name + self.lb_table = "TEST_SCRIPT_LUMI" + #self.lb_table="SFO_TZ_LUMIBLOCK" + + # Run table name + self.run_table = "TEST_SCRIPT_RUN" + #self.run_table="SFO_TZ_RUN" + + + ########## MAIN THREAD ########## # If Oracle database is down, retry to connect to it every DBTimeout (s) @@ -66,7 +86,7 @@ class Conf: self.drivenPool = [ #['Physics','Electron','default','/castor/cern.ch/user/b/battagli/RemoteFarm1','castoratlas'], #['Calibration','Test','default','/castor/cern.ch/user/b/battagli/RemoteFarm2','castoratlas'] - ['physics','express','t0atlas','/castor/cern.ch/grid/atlas/DAQ/test/streaming','castoratlas'], + ['physics','express','t0atlas','/castor/cern.ch/grid/atlas/DAQ/test/streaming','castoratlas'] ] diff --git a/Script/CopyThread.py b/Script/CopyThread.py index c55fc9b..0265316 100755 --- a/Script/CopyThread.py +++ b/Script/CopyThread.py @@ -47,15 +47,10 @@ class CopyThread(threading.Thread): def run(self): - - ##### Set Logger Level ##### self.setLogLevel() - ##### Open log file for copy process ##### - self.LogFile = open(self.logFile,'a') - - + while(1): ##### When time to exit, do not take new files to be copied, but finish ongoing copy processes ##### @@ -106,13 +101,13 @@ class CopyThread(threading.Thread): copyDir = copyfile[2] self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Create ' + copyDir + ' in Castor if it does not exits yet',extra = self.logInfo) - nsmkdir = Popen(['nsmkdir', '-p', copyDir], stdout = self.LogFile, stderr = STDOUT, env = castorEnv) + nsmkdir = Popen(['nsmkdir', '-p', copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) nsmkdir.wait() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.info(nsmkdir.stdout.read(),extra = self.logInfo) ##### Copy the file in Castor ##### - self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} - self.logger.debug('Copy (rfcp) file ' + copyfile[0] + ' to Castor',extra = self.logInfo) - rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = self.LogFile, stderr = STDOUT, env = castorEnv) + rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) #### Rename .TOBECOPIED into .COPYING file ##### os.rename(copyfile[0]+'.TOBECOPIED',copyfile[0]+'.COPYING') @@ -130,9 +125,6 @@ class CopyThread(threading.Thread): # end while - ##### Close the Log file for copy process ##### - self.LogFile.close() - # end def run() @@ -164,9 +156,10 @@ class CopyThread(threading.Thread): ##### Status can not be None for ever: after a while kill and retry ##### if (time()-t_in)>self.NoneTimeout: + os.kill(element[0].pid,signal.SIGKILL) self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.warning('Copy process is taking to long: kill it and retry',extra = self.logInfo) - os.kill(element[0].pid,signal.SIGKILL) + self.logger.debug(element[0].stdout.read(),extra = self.logInfo) if self.exitFlag: self.CopyList.remove(element) else: self.handleUnsuccCopy(element) # end if @@ -174,15 +167,19 @@ class CopyThread(threading.Thread): ##### Status 0 = successfull copy ##### elif status == 0: + + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug(element[0].stdout.read(),extra = self.logInfo) ##### Compare size of original (SFO) and copied (Castor) files ##### SFO_filesize = os.stat(DataFile)[6] #SFO_filesize = os.path.getsize(DataFile) - nsls = Popen(['nsls', '-l',CastorFile], stdout = PIPE, stderr = self.LogFile, env= castorEnv) + nsls = Popen(['nsls', '-l',CastorFile], stdout = PIPE, stderr = STDOUT, env= castorEnv) Castor_filesize = 0 - if nsls.wait()==0: - Castor_filesize = long(filter(lambda x: x!='',nsls.communicate()[0].split(' '))[4]) + if nsls.wait()==0: Castor_filesize = long(filter(lambda x: x!='',nsls.stdout.read().split(' '))[4]) + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug(nsls.stdout.read(),extra = self.logInfo) # end if if SFO_filesize == Castor_filesize: @@ -234,6 +231,7 @@ class CopyThread(threading.Thread): # else: print 'Unsuccessful copy, unconventional error code:', status, type(type(status)) self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.warning('Copy of file ' + DataFile + ' was NOT successful',extra = self.logInfo) + self.logger.debug(element[0].stdout.read(),extra = self.logInfo) if self.exitFlag: self.CopyList.remove(element) else: self.handleUnsuccCopy(element) @@ -272,14 +270,16 @@ class CopyThread(threading.Thread): self.logger.debug('Remove file in Castor if it exists',extra = self.logInfo) ##### Remove unsuccessful copy file from Castor, if it's exist ##### - rfrm = Popen(['rfrm', CastorFile], stdout = self.LogFile, stderr = STDOUT, env= castorEnv) + rfrm = Popen(['rfrm', CastorFile], stdout = PIPE, stderr = STDOUT, env= castorEnv) rfrm.wait() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug(rfrm.stdout.read(),extra = self.logInfo) ##### Retry to copy the file to Castor ##### if self.exitFlag: return self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Retry to copy file on the spot again',extra = self.logInfo) - rfcp = Popen(['rfcp', DataFile, copyDir], stdout = self.LogFile, stderr = STDOUT, env = castorEnv) + rfcp = Popen(['rfcp', DataFile, copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) idx = self.CopyList.index(element) self.CopyList[idx][0] = rfcp self.CopyList[idx][2] += 1 diff --git a/Script/Database.py b/Script/Database.py index b093901..cf4ce51 100755 --- a/Script/Database.py +++ b/Script/Database.py @@ -34,6 +34,7 @@ class OraDB: args.insert(0,self.curs) apply(func,args,keys) self.orcl.commit() + return self.curs.rowcount def getNextRow(self): return self.curs.fetchone() @@ -44,56 +45,69 @@ class OraDB: def lastOp(self): return self.curs.statement - def lastKeys(self): + def getLastKeys(self): return self.lastKeys class Database: - def __init__(self): - connection= "ATLAS_SFO_T0_W/changemenow65234@atonr_conf" - #connection= "PIPPO" - self.db = OraDB(connection) - self.file_table = "TEST_SCRIPT_FILE" - self.lb_table = "TEST_SCRIPT_LUMI" - self.run_table = "TEST_SCRIPT_RUN" - #self.file_table="SFO_TZ_FILE" - #self.lb_table="SFO_TZ_LUMIBLOCK" - #self.run_table="SFO_TZ_RUN" + def __init__(self,conf): + self.conf = conf + + ##### Set Database parameters ##### + connection = self.conf.connection + self.db = OraDB(connection) + self.file_table = self.conf.file_table + self.lb_table = self.conf.lb_table + self.run_table = self.conf.run_table + + #### Set Logger for Database ##### + self.LogDir = self.conf.LogDir + self.logInfo = {} + self.logFile = self.LogDir + '/DatabaseLog.out' + self.dbLog = logging.FileHandler(self.logFile,'w') + formatter = logging.Formatter('%(asctime)s %(levelname)-8s [%(file)s:%(line)s] %(message)s') + self.dbLog.setFormatter(formatter) + self.logger = logging.getLogger('Database') + self.logger.addHandler(self.dbLog) def Deletion(self,sfofile): + self.setLogLevel(self.conf.LogLevel) try: - self.filedeletion(sfofile) + self.filedeletion(sfofile) return True except: - print sys.exc_info() - print sys.exc_type - print sys.exc_value - print self.db.lastOp() - print self.db.lastKeys() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error(str(sys.exc_info()),extra = self.logInfo) + self.logger.error(str(sys.exc_type),extra = self.logInfo) + self.logger.error(str(sys.exc_value),extra = self.logInfo) + self.logger.error(str(self.db.lastOp()),extra = self.logInfo) + self.logger.error(str(self.db.getLastKeys()),extra = self.logInfo) return False def Transfer(self,sfofile,castorfile): + self.setLogLevel(self.conf.LogLevel) try: self.filetransfer(sfofile,castorfile) parsed=FileName(sfofile) - + if self.notTransFiles(parsed)==0: self.lbtransfer(parsed) - + if self.notTransLBs(parsed)==0: self.runtransfer(parsed) return True except: - print sys.exc_info() - print sys.exc_type - print sys.exc_value - print self.db.lastOp() - print self.db.lastKeys() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error(str(sys.exc_info()),extra = self.logInfo) + self.logger.error(str(sys.exc_type),extra = self.logInfo) + self.logger.error(str(sys.exc_value),extra = self.logInfo) + self.logger.error(str(self.db.lastOp()),extra = self.logInfo) + self.logger.error(str(self.db.getLastKeys()),extra = self.logInfo) return False @@ -105,9 +119,15 @@ class Database: keys={} keys['newdate']=datetime.datetime.fromtimestamp(time.time()) keys['ssfopfn']=sfofile - #keys['ssfopfn']='/home-users/vandelli/workdir'+sfofile[sfofile.rindex('/'):] args.insert(0,sql) - self.db.execute(args,keys) + nrow = self.db.execute(args,keys) + if nrow != 1: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error('Deletion of file: ' + sfofile + ' .Not exactly one entry updated in file table',extra = self.logInfo) + else: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('File table. File state updated to DELETED for file: ' + sfofile,extra = self.logInfo) + # end if,else def filetransfer(self,sfofile,castorfile): sql="update "+ self.file_table + " " @@ -118,9 +138,15 @@ class Database: keys['newdate']=datetime.datetime.fromtimestamp(time.time()) keys['newpfn']=castorfile keys['ssfopfn']=sfofile - #keys['ssfopfn']='/home-users/vandelli/workdir'+sfofile[sfofile.rindex('/'):] args.insert(0,sql) - self.db.execute(args,keys) + nrow = self.db.execute(args,keys) + if nrow != 1: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error('Transfer of file: ' + sfofile + ' in ' + castorfile + ' .Not exactly one entry updated in file table',extra = self.logInfo) + else: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('File table. File state updated to TRANSFERRED for file: ' + sfofile,extra = self.logInfo) + # end if,else def lbtransfer(self,parsed): sql="update "+ self.lb_table + " " @@ -137,7 +163,18 @@ class Database: keys['sstreamt']=parsed.streamt keys['sstreamn']=parsed.streamn args.insert(0,sql) - self.db.execute(args,keys) + nrow = self.db.execute(args,keys) + if nrow != 1: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error('Not exactly one entry updated in lumiblock table for SFO ' + str(parsed.sfoid) + + ', run ' + str(parsed.runnr) + ', lumiblock ' + str(parsed.lbnr) + + ', stream type ' + str(parsed.streamt) + ', stream name ' + str(parsed.streamn),extra = self.logInfo) + else: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('Lumiblock table. Lumiblock state updated to TRANSFERRED for SFO ' + str(parsed.sfoid) + + ', run ' + str(parsed.runnr) + ', lumiblock ' + str(parsed.lbnr) + + ', stream type ' + str(parsed.streamt) + ', stream name ' + str(parsed.streamn),extra = self.logInfo) + # end if,else def runtransfer(self,parsed): sql="update "+ self.run_table + " " @@ -153,7 +190,17 @@ class Database: keys['sstreamt']=parsed.streamt keys['sstreamn']=parsed.streamn args.insert(0,sql) - self.db.execute(args,keys) + nrow = self.db.execute(args,keys) + if nrow != 1: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.error('Not exactly one entry updated in run table for SFO ' + str(parsed.sfoid) + ', run ' + str(parsed.runnr) + + ', stream type ' + str(parsed.streamt) + ', stream name ' + str(parsed.streamn),extra = self.logInfo) + else: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('Run table. Run state updated to TRANSFERRED for SFO ' + str(parsed.sfoid) + ', run ' + str(parsed.runnr) + + ', stream type ' + str(parsed.streamt) + ', stream name ' + str(parsed.streamn),extra = self.logInfo) + # end if,else + def notTransFiles(self,parsed): sql="select count(lfn) from " + self.file_table + " " @@ -169,8 +216,12 @@ class Database: keys['sstreamt']=parsed.streamt keys['sstreamn']=parsed.streamn self.db.execute(args,keys) + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('Count number of files ONDISK for SFO ' + str(parsed.sfoid) + ', run ' + str(parsed.runnr) + + ', lumiblock ' + str(parsed.lbnr) + ', stream type ' + str(parsed.streamt) + + ', stream name ' + str(parsed.streamn),extra = self.logInfo) return self.db.getNextRow()[0] - + def notTransLBs(self,parsed): sql="select count(state) from " + self.lb_table + " " sql+="where sfoid = :ssfo and runnr = :srunnr " @@ -184,4 +235,16 @@ class Database: keys['sstreamt']=parsed.streamt keys['sstreamn']=parsed.streamn self.db.execute(args,keys) + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug('Count number of NOT TRANSFERRED lumiblocks for SFO ' + str(parsed.sfoid) + + ', run ' + str(parsed.runnr) + ', stream type ' + str(parsed.streamt) + + ', stream name ' + str(parsed.streamn),extra = self.logInfo) return self.db.getNextRow()[0] + + def setLogLevel(self,LogLevel): + level = LogLevel.upper() + if level == 'DEBUG': self.dbLog.setLevel(logging.DEBUG) + elif level == 'INFO': self.dbLog.setLevel(logging.INFO) + elif level == 'WARNING': self.dbLog.setLevel(logging.WARNING) + elif level == 'ERROR': self.dbLog.setLevel(logging.ERROR) + elif level == 'CRITICAL': self.dbLog.setLevel(logging.CRITICAL) diff --git a/Script/DeleteThread.py b/Script/DeleteThread.py index 31070a4..f5ca5d4 100755 --- a/Script/DeleteThread.py +++ b/Script/DeleteThread.py @@ -218,6 +218,9 @@ class DeleteThread(threading.Thread): self.logger.debug('Time to check if file has been migrated',extra = self.logInfo) Castor_file = copyDir + deletefile[0][deletefile[0].rindex('/'):] nsls = Popen(['nsls', '-l',Castor_file], stdout = PIPE, stderr = self.logFile, env= castorEnv) + nsls.wait() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug(nsls.stdout.read(),extra = self.logInfo) isMig = 'm' in nsls.communicate()[0].split(' ')[0] #isMig = nsls.communicate()[0].split(' ')[0].startswith('m') diff --git a/Script/ManagerThread.py b/Script/ManagerThread.py index c91b72e..830cbe0 100755 --- a/Script/ManagerThread.py +++ b/Script/ManagerThread.py @@ -405,14 +405,14 @@ class ManagerThread(threading.Thread): ##### Delete .data.COPYING file in Castor, if it exists ##### def clearCastor(self,filename,Pool,CastorDir,StageHost): - LogFile = open(self.logFile,'a') self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Delete ' + filename + ' in Castor, it it exists',extra = self.logInfo) CastorFile = CastorDir + filename[filename.rindex('/'):] castorEnv = {'RFIO_USE_CASTOR_V2':'YES','STAGE_HOST':StageHost,'STAGE_SVCCLASS':Pool} - rfrm = Popen(['rfrm', CastorFile],stdout = LogFile, stderr = STDOUT,env = castorEnv) + rfrm = Popen(['rfrm', CastorFile],stdout = PIPE, stderr = STDOUT,env = castorEnv) rfrm.wait() - LogFile.close() + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.debug(rfrm.stdout.read(),extra = self.logInfo) # end def clearCastor() diff --git a/Script/main.py b/Script/main.py index a3352b6..4b492d1 100755 --- a/Script/main.py +++ b/Script/main.py @@ -137,13 +137,15 @@ def checkDB(db): logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} logger.info('No connection to Metadata Database at the moment: try to create one',extra = logInfo) try: - db = Database.Database() + db = Database.Database(conf) logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} logger.info('New connection to Metadata Database created',extra = logInfo) except: logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} - logger.warning('Could not connect to Metadata Database:\n\t\t\t\t\t\t' - +str(sys.exc_info())+'\n\t\t\t\t\t\t'+str(sys.exc_type)+'\n\t\t\t\t\t\t'+str(sys.exc_value),extra = logInfo) + logger.warning('Could not connect to Metadata Database:',extra = logInfo) + logger.warning(str(sys.exc_info()),extra = logInfo) + logger.warning(str(sys.exc_type),extra = logInfo) + logger.warning(str(sys.exc_value),extra = logInfo) # end try, except copy.setDB(db) delete.setDB(db) -- GitLab