Skip to content
Snippets Groups Projects
Commit a35a1ef1 authored by Emmanuel Le Guirriec's avatar Emmanuel Le Guirriec
Browse files

Merge branch '21.2-deletemergedfiles' into '21.2'

Implementation of new feature for Archive_tf (ATLASJT-383)

See merge request atlas/athena!14050

Former-commit-id: b16ae22201f8f59d0f0a591f734a8a4769695f91
parents 7e7dcf66 e43bec35
No related branches found
No related tags found
No related merge requests found
......@@ -65,17 +65,13 @@ def addDAODArguments(parser, mergerTrf=True):
if mergerTrf:
parser.defineArgGroup('Input DAOD', 'Input DAOD files to be merged')
parser.defineArgGroup('Output DAOD', 'Output merged DAOD files')
parser.defineArgGroup('Input Logs', 'Input Log files to be merged')
parser.defineArgGroup('Output Archive', 'Output Archive file')
parser.defineArgGroup('Archiver', 'Options')
parser.add_argument('--inputDataFile','--inputLogFile', nargs='+',
type=trfArgClasses.argFactory(trfArgClasses.argFile, io='input', type='misc'),
help='Input log file(s)', group='Input Logs')
help='Input file(s)', group='Archiver')
parser.add_argument('--outputArchFile',
type=trfArgClasses.argFactory(trfArgClasses.argFile, io='output', type='misc'),
help='Output archive file', group='Output Archive')
parser.add_argument('--compressionType', group='Output Archive',
help='Underlying compression type', choices=['gzip', 'bzip2', 'none'],
default='none')
help='Output archive file', group='Archiver')
for DAOD in DAODTypes:
parser.add_argument("--input" + DAOD + "File", nargs="+",
type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io="input", type="AOD", subtype=DAOD),
......@@ -97,7 +93,7 @@ def addDAODMergerSubsteps(executorSet):
executorSet.add(hybridPOOLMergeExecutor(name = DAOD.lstrip("DAOD_") + 'Merge', skeletonFile = 'RecJobTransforms/skeleton.MergePool_tf.py',
inData = [DAOD], outData = [DAOD+'_MRG'])
)
executorSet.add(archiveExecutor(name = 'Archiver',inData = ['Data'], outData = ['Arch'], exe='tar'))
executorSet.add(archiveExecutor(name = 'Archiver',inData = ['Data'], outData = ['Arch'], exe='zip'))
def knownDAODTypes():
DAODTypes = []
......
......@@ -1865,18 +1865,12 @@ class tagMergeExecutor(scriptExecutor):
## @brief Archive transform
class archiveExecutor(scriptExecutor):
def __init__(self, name = 'Archiver', exe = 'zip', inData = set(), outData = set()):
super(archiveExecutor, self).__init__(name=name, exe=exe, memMonitor=False, inData=inData, outData=outData)
def preExecute(self, input = set(), output = set()):
self.setPreExeStart()
self._memMonitor = False
if 'exe' in self.conf.argdict:
self._exe = self.conf.argdict['exe']
#unpack archived inputs
import tarfile, zipfile
import zipfile
if 'inputDataFile' in self.conf.argdict:
for f in self.conf.argdict['inputDataFile'].value:
if zipfile.is_zipfile(f):
......@@ -1884,57 +1878,9 @@ class archiveExecutor(scriptExecutor):
print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')
archive.extractall('tmp')
archive.close()
elif tarfile.is_tarfile(f):
archive = tarfile.open(f, 'r:*')
print 'Extracting input tar file {0} to temporary directory {1}'.format(f,'tmp')
archive.extractall('tmp')
archive.close()
#proceed to archive
if self._exe == 'tar':
#this is needed to keep the transform from scheduling two sub-steps
if 'outputArchFile' not in self.conf.argdict:
raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
self._cmd = ['python']
try:
with open('tar_wrapper.py', 'w') as tar_wrapper:
print >> tar_wrapper, "import zipfile, tarfile, os, shutil"
if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
#appending input file(s) to existing archive. Compressed writing in append mode is not possible
print >> tar_wrapper, "tar = tarfile.open('{}', 'a:')".format(self.conf.argdict['outputArchFile'].value[0])
else:
#creating new archive
if 'compressionType' in self.conf.argdict:
if self.conf.argdict['compressionType'] == 'gzip':
print >> tar_wrapper, "tar = tarfile.open('{}', 'w:gz')".format(self.conf.argdict['outputArchFile'].value[0])
elif self.conf.argdict['compressionType'] == 'bzip2':
print >> tar_wrapper, "tar = tarfile.open('{}', 'w:bz2')".format(self.conf.argdict['outputArchFile'].value[0])
elif self.conf.argdict['compressionType'] == 'none':
print >> tar_wrapper, "tar = tarfile.open('{}', 'w:')".format(self.conf.argdict['outputArchFile'].value[0])
print >> tar_wrapper, "for f in {}:".format(self.conf.argdict['inputDataFile'].value)
print >> tar_wrapper, " if not zipfile.is_zipfile(f) and not tarfile.is_tarfile(f):"
print >> tar_wrapper, " print 'Tarring {}'.format(os.path.basename(f))"
print >> tar_wrapper, " tar.add(f)"
print >> tar_wrapper, "if os.path.isdir('tmp'):"
print >> tar_wrapper, " for root, dirs, files in os.walk('tmp'):"
print >> tar_wrapper, " for name in files:"
print >> tar_wrapper, " print 'Tarring {}'.format(name)"
print >> tar_wrapper, " tar.add(os.path.join(root, name),name)"
print >> tar_wrapper, " shutil.rmtree('tmp')"
print >> tar_wrapper, "tar.close()"
os.chmod('tar_wrapper.py', 0755)
except (IOError, OSError) as e:
errMsg = 'error writing tar wrapper {fileName}: {error}'.format(fileName = 'tar_wrapper.py',
error = e
)
msg.error(errMsg)
raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
errMsg
)
self._cmd.append('tar_wrapper.py')
elif self._exe == 'zip':
#archiving
if self._exe == 'zip':
if 'outputArchFile' not in self.conf.argdict:
raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
......@@ -1949,9 +1895,12 @@ class archiveExecutor(scriptExecutor):
#creating new archive
print >> zip_wrapper, "zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0])
print >> zip_wrapper, "for f in {}:".format(self.conf.argdict['inputDataFile'].value)
print >> zip_wrapper, " if not zipfile.is_zipfile(f) and not tarfile.is_tarfile(f):"
print >> zip_wrapper, " if not zipfile.is_zipfile(f):"
print >> zip_wrapper, " print 'Zipping {}'.format(os.path.basename(f))"
print >> zip_wrapper, " zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)"
print >> zip_wrapper, " if os.access(f, os.F_OK):"
print >> zip_wrapper, " print 'Removing input file {}'.format(f)"
print >> zip_wrapper, " os.unlink(f)"
print >> zip_wrapper, "if os.path.isdir('tmp'):"
print >> zip_wrapper, " for root, dirs, files in os.walk('tmp'):"
print >> zip_wrapper, " for name in files:"
......@@ -1970,19 +1919,18 @@ class archiveExecutor(scriptExecutor):
)
self._cmd.append('zip_wrapper.py')
#unarchiving
elif self._exe == 'unarchive':
if not zipfile.is_zipfile(self.conf.argdict['inputArchFile'].value[0]) and not tarfile.is_tarfile(self.conf.argdict['inputArchFile'].value[0]):
raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
'The input file is not a zip or tar archive - aborting unpacking')
for infile in self.conf.argdict['inputArchFile'].value:
if not zipfile.is_zipfile(infile):
raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
'An input file is not a zip archive - aborting unpacking')
self._cmd = ['python']
try:
with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
print >> unarchive_wrapper, "import zipfile, tarfile"
print >> unarchive_wrapper, "import zipfile"
print >> unarchive_wrapper, "for f in {}:".format(self.conf.argdict['inputArchFile'].value)
print >> unarchive_wrapper, " if zipfile.is_zipfile(f):"
print >> unarchive_wrapper, " archive = zipfile.ZipFile(f, mode='r')"
print >> unarchive_wrapper, " elif tarfile.is_tarfile(f):"
print >> unarchive_wrapper, " archive = tarfile.open(f, 'r:*')"
print >> unarchive_wrapper, " archive = zipfile.ZipFile(f, mode='r')"
print >> unarchive_wrapper, " path = '{}'".format(self.conf.argdict['path'])
print >> unarchive_wrapper, " print 'Extracting archive {0} to {1}'.format(f,path)"
print >> unarchive_wrapper, " archive.extractall(path)"
......@@ -1998,3 +1946,4 @@ class archiveExecutor(scriptExecutor):
)
self._cmd.append('unarchive_wrapper.py')
super(archiveExecutor, self).preExecute(input=input, output=output)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment