diff --git a/PhysicsAnalysis/PATJobTransforms/python/PATTransformUtils.py b/PhysicsAnalysis/PATJobTransforms/python/PATTransformUtils.py index 71fd61add00bfadc3e6b1e717d33d6fa22079c2a..0f780d9f0adb1b8b1fd4b4385cd0b66f27a7105f 100644 --- a/PhysicsAnalysis/PATJobTransforms/python/PATTransformUtils.py +++ b/PhysicsAnalysis/PATJobTransforms/python/PATTransformUtils.py @@ -65,17 +65,13 @@ def addDAODArguments(parser, mergerTrf=True): if mergerTrf: parser.defineArgGroup('Input DAOD', 'Input DAOD files to be merged') parser.defineArgGroup('Output DAOD', 'Output merged DAOD files') - parser.defineArgGroup('Input Logs', 'Input Log files to be merged') - parser.defineArgGroup('Output Archive', 'Output Archive file') + parser.defineArgGroup('Archiver', 'Options') parser.add_argument('--inputDataFile','--inputLogFile', nargs='+', type=trfArgClasses.argFactory(trfArgClasses.argFile, io='input', type='misc'), - help='Input log file(s)', group='Input Logs') + help='Input file(s)', group='Archiver') parser.add_argument('--outputArchFile', type=trfArgClasses.argFactory(trfArgClasses.argFile, io='output', type='misc'), - help='Output archive file', group='Output Archive') - parser.add_argument('--compressionType', group='Output Archive', - help='Underlying compression type', choices=['gzip', 'bzip2', 'none'], - default='none') + help='Output archive file', group='Archiver') for DAOD in DAODTypes: parser.add_argument("--input" + DAOD + "File", nargs="+", type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io="input", type="AOD", subtype=DAOD), @@ -97,7 +93,7 @@ def addDAODMergerSubsteps(executorSet): executorSet.add(hybridPOOLMergeExecutor(name = DAOD.lstrip("DAOD_") + 'Merge', skeletonFile = 'RecJobTransforms/skeleton.MergePool_tf.py', inData = [DAOD], outData = [DAOD+'_MRG']) ) - executorSet.add(archiveExecutor(name = 'Archiver',inData = ['Data'], outData = ['Arch'], exe='tar')) + executorSet.add(archiveExecutor(name = 'Archiver',inData = ['Data'], outData = ['Arch'], exe='zip')) def knownDAODTypes(): DAODTypes = [] diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py index 7bbfb03c0222647f1b8eee409d94dcc9785cb029..a7422f774e59d844192b6634fba2b1a29ba401ec 100755 --- a/Tools/PyJobTransforms/python/trfExe.py +++ b/Tools/PyJobTransforms/python/trfExe.py @@ -1865,18 +1865,12 @@ class tagMergeExecutor(scriptExecutor): ## @brief Archive transform class archiveExecutor(scriptExecutor): - def __init__(self, name = 'Archiver', exe = 'zip', inData = set(), outData = set()): - super(archiveExecutor, self).__init__(name=name, exe=exe, memMonitor=False, inData=inData, outData=outData) - def preExecute(self, input = set(), output = set()): self.setPreExeStart() self._memMonitor = False - if 'exe' in self.conf.argdict: - self._exe = self.conf.argdict['exe'] - #unpack archived inputs - import tarfile, zipfile + import zipfile if 'inputDataFile' in self.conf.argdict: for f in self.conf.argdict['inputDataFile'].value: if zipfile.is_zipfile(f): @@ -1884,57 +1878,9 @@ class archiveExecutor(scriptExecutor): print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp') archive.extractall('tmp') archive.close() - elif tarfile.is_tarfile(f): - archive = tarfile.open(f, 'r:*') - print 'Extracting input tar file {0} to temporary directory {1}'.format(f,'tmp') - archive.extractall('tmp') - archive.close() - #proceed to archive - if self._exe == 'tar': - #this is needed to keep the transform from scheduling two sub-steps - if 'outputArchFile' not in self.conf.argdict: - raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name') - - self._cmd = ['python'] - try: - with open('tar_wrapper.py', 'w') as tar_wrapper: - print >> tar_wrapper, "import zipfile, tarfile, os, shutil" - if os.path.exists(self.conf.argdict['outputArchFile'].value[0]): - #appending input file(s) to existing archive. Compressed writing in append mode is not possible - print >> tar_wrapper, "tar = tarfile.open('{}', 'a:')".format(self.conf.argdict['outputArchFile'].value[0]) - else: - #creating new archive - if 'compressionType' in self.conf.argdict: - if self.conf.argdict['compressionType'] == 'gzip': - print >> tar_wrapper, "tar = tarfile.open('{}', 'w:gz')".format(self.conf.argdict['outputArchFile'].value[0]) - elif self.conf.argdict['compressionType'] == 'bzip2': - print >> tar_wrapper, "tar = tarfile.open('{}', 'w:bz2')".format(self.conf.argdict['outputArchFile'].value[0]) - elif self.conf.argdict['compressionType'] == 'none': - print >> tar_wrapper, "tar = tarfile.open('{}', 'w:')".format(self.conf.argdict['outputArchFile'].value[0]) - print >> tar_wrapper, "for f in {}:".format(self.conf.argdict['inputDataFile'].value) - print >> tar_wrapper, " if not zipfile.is_zipfile(f) and not tarfile.is_tarfile(f):" - print >> tar_wrapper, " print 'Tarring {}'.format(os.path.basename(f))" - print >> tar_wrapper, " tar.add(f)" - print >> tar_wrapper, "if os.path.isdir('tmp'):" - print >> tar_wrapper, " for root, dirs, files in os.walk('tmp'):" - print >> tar_wrapper, " for name in files:" - print >> tar_wrapper, " print 'Tarring {}'.format(name)" - print >> tar_wrapper, " tar.add(os.path.join(root, name),name)" - print >> tar_wrapper, " shutil.rmtree('tmp')" - print >> tar_wrapper, "tar.close()" - os.chmod('tar_wrapper.py', 0755) - except (IOError, OSError) as e: - errMsg = 'error writing tar wrapper {fileName}: {error}'.format(fileName = 'tar_wrapper.py', - error = e - ) - msg.error(errMsg) - raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), - errMsg - ) - self._cmd.append('tar_wrapper.py') - - elif self._exe == 'zip': + #archiving + if self._exe == 'zip': if 'outputArchFile' not in self.conf.argdict: raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name') @@ -1949,9 +1895,12 @@ class archiveExecutor(scriptExecutor): #creating new archive print >> zip_wrapper, "zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]) print >> zip_wrapper, "for f in {}:".format(self.conf.argdict['inputDataFile'].value) - print >> zip_wrapper, " if not zipfile.is_zipfile(f) and not tarfile.is_tarfile(f):" + print >> zip_wrapper, " if not zipfile.is_zipfile(f):" print >> zip_wrapper, " print 'Zipping {}'.format(os.path.basename(f))" print >> zip_wrapper, " zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)" + print >> zip_wrapper, " if os.access(f, os.F_OK):" + print >> zip_wrapper, " print 'Removing input file {}'.format(f)" + print >> zip_wrapper, " os.unlink(f)" print >> zip_wrapper, "if os.path.isdir('tmp'):" print >> zip_wrapper, " for root, dirs, files in os.walk('tmp'):" print >> zip_wrapper, " for name in files:" @@ -1970,19 +1919,18 @@ class archiveExecutor(scriptExecutor): ) self._cmd.append('zip_wrapper.py') + #unarchiving elif self._exe == 'unarchive': - if not zipfile.is_zipfile(self.conf.argdict['inputArchFile'].value[0]) and not tarfile.is_tarfile(self.conf.argdict['inputArchFile'].value[0]): - raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'), - 'The input file is not a zip or tar archive - aborting unpacking') + for infile in self.conf.argdict['inputArchFile'].value: + if not zipfile.is_zipfile(infile): + raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'), + 'An input file is not a zip archive - aborting unpacking') self._cmd = ['python'] try: with open('unarchive_wrapper.py', 'w') as unarchive_wrapper: - print >> unarchive_wrapper, "import zipfile, tarfile" + print >> unarchive_wrapper, "import zipfile" print >> unarchive_wrapper, "for f in {}:".format(self.conf.argdict['inputArchFile'].value) - print >> unarchive_wrapper, " if zipfile.is_zipfile(f):" - print >> unarchive_wrapper, " archive = zipfile.ZipFile(f, mode='r')" - print >> unarchive_wrapper, " elif tarfile.is_tarfile(f):" - print >> unarchive_wrapper, " archive = tarfile.open(f, 'r:*')" + print >> unarchive_wrapper, " archive = zipfile.ZipFile(f, mode='r')" print >> unarchive_wrapper, " path = '{}'".format(self.conf.argdict['path']) print >> unarchive_wrapper, " print 'Extracting archive {0} to {1}'.format(f,path)" print >> unarchive_wrapper, " archive.extractall(path)" @@ -1998,3 +1946,4 @@ class archiveExecutor(scriptExecutor): ) self._cmd.append('unarchive_wrapper.py') super(archiveExecutor, self).preExecute(input=input, output=output) +