diff --git a/Script/CopyThread.py b/Script/CopyThread.py index 807af78b3e59d6ae581f98124ed70d7d050c9d49..e7d01b77957f8efd2c6b41d8d6fb6e0d59c090a5 100755 --- a/Script/CopyThread.py +++ b/Script/CopyThread.py @@ -11,7 +11,8 @@ from subprocess import * import signal import logging from Constants import * -from utils import set_log_level,castorinfo +from utils import set_log_level + class CopyThread(threading.Thread): @@ -66,7 +67,7 @@ class CopyThread(threading.Thread): self.logger.debug('Size of CopyQueue: ' + str(self.CopyQueue.qsize()) + ', Size of DeleteQueue: ' + str(self.DeleteQueue.qsize()) + ', Size of ClearQueue: ' + str(self.ClearQueue.qsize()),extra = self.logInfo) - ##### Launch n = maxCopy of parallel rfcp processes ##### + ##### Launch n = maxCopy of parallel copy processes ##### self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Launch ' + str(self.maxCopy) + ' parallel copy processes',extra = self.logInfo) while ( len(self.CopyList) < self.maxCopy): @@ -101,16 +102,12 @@ class CopyThread(threading.Thread): ##### Create the copy directory in Castor, if it doesn't exist yet ##### pool = copyfile[1] stagehost = copyfile[3] - castorEnv = self.getCastorEnv(pool,stagehost) copyDir = copyfile[2] CastorFile = os.path.join(copyDir, os.path.basename(copyfile[0])) self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Create ' + copyDir + ' in Castor if it does not exits yet',extra = self.logInfo) - nsmkdir = Popen(['nsmkdir', '-p', copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) - nsmkdir.wait() - self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} - self.logger.info(nsmkdir.stdout.read(),extra = self.logInfo) + self.conf.backend.mkdir(copyDir, stagehost, pool, self.logger) #### Fetch information for consistency checks @@ -132,14 +129,16 @@ class CopyThread(threading.Thread): if valid: ##### Copy the file in Castor ##### - rfcp = Popen(['rfcp', copyfile[0], CastorFile], stdout = PIPE, stderr = STDOUT, env = castorEnv) + copyproc = self.conf.backend.backgroundcopy(\ + copyfile[0], CastorFile, \ + stagehost, pool, self.logger) #### Rename .TOBECOPIED into .COPYING file ##### os.rename(copyfile[0] + tobecopied_ext, copyfile[0]+ copying_ext) ##### Create a list of (copy process, .COPYING file name, n retry = 0, Pool, CopyDir, timestamp, Castor stage hostxem) ##### - self.CopyList.append([rfcp,copyfile[0],0,pool,copyDir,time(),stagehost, SFO_filesize, dbchecksum, dbfilesize, filehealth]) + self.CopyList.append([copyproc,copyfile[0],0,pool,copyDir,time(),stagehost, SFO_filesize, dbchecksum, dbfilesize, filehealth]) else: self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} @@ -166,7 +165,7 @@ class CopyThread(threading.Thread): - ##### Check rfcp processes status ##### + ##### Check copy processes status ##### def statusCheck(self): ##### Check the status for all commands in CopyList ##### @@ -174,12 +173,12 @@ class CopyThread(threading.Thread): copyList = self.CopyList[:] for element in copyList: - status = element[0].poll() + status = self.conf.backend.copystate(element[0][0]) + pid = element[0][1] DataFile = element[1] nretry = element[2] pool = element[3] stagehost = element[6] - castorEnv = self.getCastorEnv(pool,stagehost) copyDir = element[4] CastorFile = os.path.join(copyDir, os.path.basename(DataFile)) @@ -194,10 +193,11 @@ class CopyThread(threading.Thread): ##### Status can not be None for ever: after a while kill and retry ##### if (time()-t_in)>self.NoneTimeout: - os.kill(element[0].pid,signal.SIGKILL) + os.kill(pid, signal.SIGKILL) self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.warning('Copy process is taking to long: kill it and retry',extra = self.logInfo) - self.logger.debug(element[0].stdout.read(),extra = self.logInfo) + self.logger.debug(self.conf.backend.copystdout(element[0][0]), + extra = self.logInfo) if self.exitFlag: self.CopyList.remove(element) else: self.handleUnsuccCopy(element) # end if @@ -207,14 +207,17 @@ class CopyThread(threading.Thread): elif status == 0: self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} - self.logger.debug(element[0].stdout.read(),extra = self.logInfo) + self.logger.debug(self.conf.backend.copystdout(element[0][0]), + extra = self.logInfo) ##### Compare size of original (SFO) and copied (Castor) files ##### SFO_filesize = element[7] + sizechecksum = self.conf.backend.sizechecksum Castor_filesize, checksum = \ - castorinfo(CastorFile, castorEnv, \ - self.logger) + sizechecksum(CastorFile, \ + stagehost, pool, \ + self.logger) success = (SFO_filesize == Castor_filesize) @@ -264,19 +267,10 @@ class CopyThread(threading.Thread): else: self.handleUnsuccCopy(element) else: - # TO DO: different actions for different error code - # if status == 1: print 'Unsuccessful copy: bad parameter' - # elif status == 2: print 'Unsuccessful copy: system error' - # elif status == 3: print 'Unsuccessful copy: unknown error' - # elif status == 16: print 'Unsuccessful copy: device or resource busy' - # elif status == 28: print 'Unsuccessful copy: no space left on device' - # elif status == 196: print 'Unsuccessful copy: request killed' - # elif status == 198: print 'Unsuccessful copy: stager not active' - # elif status == 200: print 'Unsuccessful copy: bad checksum' - # else: print 'Unsuccessful copy, unconventional error code:', status, type(type(status)) self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} - self.logger.warning('Copy of file ' + DataFile + ' was NOT successful. rfcp exit code is ' + str(status),extra = self.logInfo) - self.logger.debug(element[0].stdout.read(),extra = self.logInfo) + self.logger.warning('Copy of file ' + DataFile + ' was NOT successful. Exit code is ' + str(status),extra = self.logInfo) + self.logger.debug(self.conf.backend.copystdout(element[0][0]), + extra = self.logInfo) if self.exitFlag: self.CopyList.remove(element) else: self.handleUnsuccCopy(element) @@ -284,17 +278,9 @@ class CopyThread(threading.Thread): # end for # end def statusCheck() - - - ##### Get Castor environment ##### - def getCastorEnv(self,pool,stagehost): - return {'RFIO_USE_CASTOR_V2':'YES','STAGE_HOST':stagehost,'STAGE_SVCCLASS':pool,'USER':'atlascdr'} - # end def getCastorEnv() - - ##### Handle unsuccessful copies (status = 1, or status =0 but ##### ##### filesizes on SFO and on Castor disagree, or status = None for too much time) ##### - ##### element is a list of (rfcp process, .TOBECOPIED file, Castor pool, Castor copy directory, timestamp, Castor stage host) ##### + ##### element is a list of ( (process,pid), .TOBECOPIED file, Castor pool, Castor copy directory, timestamp, Castor stage host) ##### def handleUnsuccCopy(self,element): self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Handle unsuccessful copy of file ' + element[1],extra = self.logInfo) @@ -302,7 +288,6 @@ class CopyThread(threading.Thread): nretry = element[2] pool = element[3] stagehost = element[6] - castorEnv = self.getCastorEnv(pool,stagehost) copyDir = element[4] CastorFile = os.path.join(copyDir, os.path.basename(DataFile)) @@ -315,19 +300,18 @@ class CopyThread(threading.Thread): self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Remove file in Castor if it exists',extra = self.logInfo) - ##### Remove unsuccessful copy file from Castor, if it's exist ##### - rfrm = Popen(['rfrm', CastorFile], stdout = PIPE, stderr = STDOUT, env= castorEnv) - rfrm.wait() - self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} - self.logger.debug(rfrm.stdout.read(),extra = self.logInfo) + #### Remove unsuccessful copy file from Castor, if it's exist #### + self.conf.backend.remove(CastorFile, stagehost, pool, self.logger) ##### Retry to copy the file to Castor ##### if self.exitFlag: return self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Retry to copy file on the spot again',extra = self.logInfo) - rfcp = Popen(['rfcp', DataFile, CastorFile], stdout = PIPE, stderr = STDOUT, env = castorEnv) + copyproc = self.conf.backend.backgroundcopy(\ + DataFile, CastorFile, \ + stagehost, pool, self.logger) idx = self.CopyList.index(element) - self.CopyList[idx][0] = rfcp + self.CopyList[idx][0] = copyproc self.CopyList[idx][2] += 1 self.CopyList[idx][5] = time() diff --git a/Script/DeleteThread.py b/Script/DeleteThread.py index 37930a4fd3818df5d0a52e7143ecc7910d0ff568..7359a5e1d9b1f6f5a655e99a527ba2ce23b14f2b 100755 --- a/Script/DeleteThread.py +++ b/Script/DeleteThread.py @@ -12,7 +12,7 @@ import logging from Constants import * from operator import itemgetter, add, contains from functools import partial -from utils import set_log_level, castorinfo +from utils import set_log_level class DeleteThread(threading.Thread): @@ -120,7 +120,6 @@ class DeleteThread(threading.Thread): ##### Get Castor environment and copy directory ##### pool = deletefile[1] stagehost = deletefile[3] - castorEnv = self.getCastorEnv(pool,stagehost) copyDir = deletefile[2] ##### At beginning or after n copied files, get filesystem size info ##### @@ -211,7 +210,7 @@ class DeleteThread(threading.Thread): os.path.basename(deletefile[0])) (isMigr, isMerged) = \ - self.checkMigration(Castor_file, castorEnv) + self.checkMigration(Castor_file, stagehost, pool) if isMigr : @@ -226,9 +225,12 @@ class DeleteThread(threading.Thread): success = True if not isMerged: + + sizechecksum = self.conf.backend.sizechecksum castorfilesize, checksum = \ - castorinfo(Castor_file, castorEnv, \ - self.logger) + sizechecksum (Castor_file,\ + stagehost, pool, \ + self.logger) sfofilesize = os.stat(deletefile[0])[6] @@ -277,29 +279,21 @@ class DeleteThread(threading.Thread): # end def run() - def checkMigration(self, castorfile, castorenv): + def checkMigration(self, castorfile, stagehost, pool): ##### Check if file has already been migrated ##### self.logInfo = {'file':self.logger.findCaller()[0], 'line':self.logger.findCaller()[1]} self.logger.debug('Time to check if file has been migrated: ' + castorfile, extra = self.logInfo) - nsls = Popen(['nsls', '-l',castorfile], stdout = PIPE, - stderr = STDOUT, env= castorenv) - ret = nsls.wait() - nslsOut = nsls.stdout.read() - self.logInfo = {'file':self.logger.findCaller()[0], - 'line':self.logger.findCaller()[1]} - self.logger.debug(nslsOut,extra = self.logInfo) + + isMig = self.conf.backend.migrated(castorfile, stagehost, + pool, self.logger) - isMig = False isMerged = False - if ret == 0 and 'm' in nslsOut.split(' ')[0]: - isMig = True - elif self.mergedChecker: + if (not isMig) and self.mergedChecker: - ## Here we have to add some code to handle file merging ## self.logInfo = {'file':self.logger.findCaller()[0], 'line':self.logger.findCaller()[1]} self.logger.debug('Time to check if merged file' + @@ -307,23 +301,15 @@ class DeleteThread(threading.Thread): % castorfile, extra = self.logInfo) - if self.mergedChecker(castorfile, castorenv): + if self.mergedChecker(castorfile, stagehost, pool, \ + self.conf.backend, self.logger): isMig = True isMerged = True return isMig,isMerged - - ##### Get Castor environment ##### - def getCastorEnv(self,pool,stagehost): - return {'RFIO_USE_CASTOR_V2':'YES','STAGE_HOST':stagehost, - 'STAGE_SVCCLASS':pool,'USER':'atlascdr'} - # end def getCastorEnv() - - - ##### Get total size of all filesystems and size of the single filesystem the file belongs to ##### def getFSsize(self,filename): diff --git a/Script/MergedMigrated.py b/Script/MergedMigrated.py index fa599df333778cd27066b23b6660c202643feafe..70e679e2a685d8c62a2b04f97e6c51e1f1a9138f 100644 --- a/Script/MergedMigrated.py +++ b/Script/MergedMigrated.py @@ -1,6 +1,6 @@ #!/bin/env python -__version__='$Revision:$' +__version__='$Revision$' # $Source$ from SFOFileNameParser import SFOFileNameParser @@ -10,86 +10,71 @@ from subprocess import * BASEDIR = '/castor/cern.ch/grid/atlas/tzero/prod1/perm/' -def nsls(target, castorenv, opts=[]): - command = ['nsls'] - command.extend(opts) - command.append(target) - - kwds = {} - - if castorenv: - kwds['env'] = castorenv - - nsls = Popen(command, stdout = PIPE, - stderr = STDOUT, **kwds) - - nslsOut, _ = nsls.communicate() - ret = nsls.wait() - - return ((ret == 0), nslsOut) +def MergedMigrated(castorfile, stager, pool, backend, logger=None, verbose = False): + """ +Destination directory: +/<basedir>/<projecttag>/<streamtype_streamname>/<runnr-7digit>/<dataset>/ +basedir = /castor/cern.ch/grid/atlas/tzero/prod1/perm/ +projecttag = data09_cos +dataset = data09_cos.00122050.physics_IDCosmic.merge.RAW -def MergedMigrated(castorfile, castorenv, verbose = False): - """ - Input file: /some_path/data09_cos.00122050.physics_IDCosmic.daq.RAW._lb0123._SFO-1._0001.data +Merged file name is: - Check fo migration of: - /<basedir>/<projecttag>/<streamtype_streamname>/<runnr-7digit>/<dataset>/<dataset>._lb<lbnr-4digit>._0001.<y> +1) if all SFO files of a LB fit into a single file + <project>.<runnr>.<stream>.daq.RAW._lbXXXX._SFO-N._<fileseqnr>.data, N=4,5,9,10 + --> <project>.<runnr>.<stream>.merge.RAW._lbXXXX._SFO-ALL._0001.<attemptnr> - basedir = /castor/cern.ch/grid/atlas/tzero/prod1/perm/ - projecttag = data09_cos - dataset = data09_cos.00122050.physics_IDCosmic.merge.RAW - """ +2) if all SFO files of a LB don't fit into a single file + <project>.<runnr>.<stream>.daq.RAW._lbXXXX._SFO-N._<fileseqnr>.data, N=4,5,9,10 + --> <project>.<runnr>.<stream>.merge.RAW._lbXXXX._SFO-N._<fileseqnr>.<attemptnr>, N=4,5,9,10 + """ + ### Build the target directory name parsed = SFOFileNameParser(os.path.basename(castorfile)) dataset = '%s.%s.%s_%s.merge.RAW' \ % (parsed.ProjectTag(), parsed.RunNr(), \ parsed.StreamType(), parsed.StreamName()) - ### List the content of the path + path = os.path.join(BASEDIR, parsed.ProjectTag(), \ '%s_%s' % (parsed.StreamType(), parsed.StreamName()), \ - '%07d' % int(parsed.RunNr()), \ + '%s' % parsed.RunNr(), \ '%s' % dataset) - if verbose: print path + ### Look for files into the dataset dir - success, all_files = nsls(path, castorenv) + success, all_files = backend.listdir(path, stager, pool, logger) - ## If fails, return false + ### If fails, return false if not success: return False - files = [] - target_lb = int(parsed.LBNr()) - for f in all_files.split('\n'): - if not f: continue + ### Create the two type of name we expect (neglecting attempt number) + notmerged = os.path.splitext(os.path.basename(castorfile))[0] + notmerged = notmerged.replace('.daq.','.merge.') + merged = '.'.join(notmerged.split('.',6)[:-1]+['_SFO-ALL._0001']) - lbstring = f.split('.',6)[5].partition('_lb')[2] - if '-' in lbstring: - #Multi LB file - minlb,maxlb = [int(lb) for lb in lbstring.split('-')] - if minlb <= target_lb and \ - maxlb >= target_lb: files.append(f) - else: - #Single LB file - if target_lb == int(lbstring): files.append(f) + ### Find all the files with the correct name + files = [f for f in all_files.split('\n') + if f and (notmerged in f or merged in f)] if not files: return False - - file = sorted([(int(f.rsplit('.',1)[1]),f) for f in files])[-1][1] - #Check the migration status for the file - success, out = nsls(os.path.join(path, file), \ - castorenv, opts = ['-l']) - - if success and 'm' in out.split(' ')[0]: return True - else: return False + ### Take the file with the highest attempt number + file = sorted([(int(f.rsplit('.',1)[1]),f) for f in files])[-1][1] + ### Check the migration status for the file + return backend.migrated(os.path.join(path, file), + stager, pool, logger) + if __name__ == '__main__': + + backend = __import__('castorstorage',globals(), locals()) - print MergedMigrated(sys.argv[1], '', verbose=True) + print MergedMigrated(sys.argv[1], sys.argv[2], sys.argv[3], + backend, verbose=True)