Commit c4600296 authored by Mihai Patrascoiu's avatar Mihai Patrascoiu
Browse files

Merge branch 'FTS-1644' into 'develop'

Add parsing of params in bulk file

Closes FTS-1644

See merge request !45
parents 05e4498a abdd0199
Pipeline #2023066 passed with stage
in 7 minutes and 36 seconds
......@@ -27,8 +27,31 @@ import time
from base import Base
from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_CHECKSUM = 'ADLER32'
DEFAULT_PARAMS = {
'checksum': 'ADLER32',
'overwrite': False,
'reuse': False,
'job_metadata': None,
'file_metadata': None,
'filesize': None,
'gridftp': None,
'spacetoken': None,
'source_spacetoken': None,
'verify_checksum': 'n',
'copy_pin_lifetime': -1,
'bring_online': -1,
'archive_timeout': -1,
'timeout': None,
'fail_nearline': False,
'retry': 0,
'multihop': False,
'credential': None,
'nostreams': None,
's3alternate': False,
'target_qos': None,
'ipv4': False,
'ipv6': False
}
def _metadata(data):
try:
......@@ -95,9 +118,9 @@ class JobSubmitter(Base):
help='expiration time of the delegation in minutes.')
self.opt_parser.add_option('--delegate-when-lifetime-lt', type=int, default=120,
help='delegate the proxy when the remote lifetime is less than this value (in minutes)')
self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', default=False, action='store_true',
self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true',
help='overwrite files.')
self.opt_parser.add_option('-r', '--reuse', dest='reuse', default=False, action='store_true',
self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true',
help='enable session reuse for the transfer job.')
self.opt_parser.add_option('--job-metadata', dest='job_metadata',
help='transfer job metadata.')
......@@ -113,38 +136,38 @@ class JobSubmitter(Base):
help='the source space token or its description.')
self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true',
help='deprecated: compare checksums between source and destination.')
self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string', default='none',
self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string',
help='compare checksums in source, target, both or none.')
self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long', default=-1,
self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long',
help='pin lifetime of the copy in seconds.')
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long', default=None,
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long',
help='bring online timeout in seconds.')
self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long', default=None,
self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long',
help='archive timeout in seconds.')
self.opt_parser.add_option('--timeout', dest='timeout', type='long', default=None,
self.opt_parser.add_option('--timeout', dest='timeout', type='long',
help='transfer timeout in seconds.')
self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', default=False, action='store_true',
self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', action='store_true',
help='fail the transfer if the file is nearline.')
self.opt_parser.add_option('--dry-run', dest='dry_run', default=False, action='store_true',
help='do not send anything, just print the JSON message.')
self.opt_parser.add_option('-f', '--file', dest='bulk_file', type='string',
help='Name of configuration file')
self.opt_parser.add_option('--retry', dest='retry', type='int', default=0,
self.opt_parser.add_option('--retry', dest='retry', type='int',
help='Number of retries. If 0, the server default will be used.'
'If negative, there will be no retries.')
self.opt_parser.add_option('-m', '--multi-hop', dest='multihop', default=False, action='store_true',
self.opt_parser.add_option('-m', '--multi-hop', dest='multihop', action='store_true',
help='submit a multihop transfer.')
self.opt_parser.add_option('--cloud-credentials', dest='cloud_cred', default=None,
self.opt_parser.add_option('--cloud-credentials', dest='cloud_cred',
help='use cloud credentials for the job (i.e. dropbox).')
self.opt_parser.add_option('--nostreams', dest='nostreams', default=None,
self.opt_parser.add_option('--nostreams', dest='nostreams',
help='number of streams')
self.opt_parser.add_option('--ipv4', dest='ipv4', default=False, action='store_true',
self.opt_parser.add_option('--ipv4', dest='ipv4', action='store_true',
help='force ipv4')
self.opt_parser.add_option('--ipv6', dest='ipv6', default=False, action='store_true',
self.opt_parser.add_option('--ipv6', dest='ipv6', action='store_true',
help='force ipv6')
self.opt_parser.add_option('--s3alternate', dest='s3alternate', default=False, action='store_true',
self.opt_parser.add_option('--s3alternate', dest='s3alternate', action='store_true',
help='use S3 alternate URL')
self.opt_parser.add_option('--target-qos', dest='target_qos', type='string', default=None,
self.opt_parser.add_option('--target-qos', dest='target_qos', type='string',
help='define the target QoS for this transfer for CDMI endpoints')
def validate(self):
......@@ -155,24 +178,20 @@ class JobSubmitter(Base):
sys.exit(1)
elif len(self.args) == 2:
(self.source, self.destination) = self.args
self.checksum = None
elif len(self.args) == 3:
(self.source, self.destination, self.checksum) = self.args
else:
self.logger.critical("Too many parameters")
sys.exit(1)
else:
self.checksum = None
if self.options.ipv4 and self.options.ipv6:
self._prepare_options()
if self.params['ipv4'] and self.params['ipv6']:
self.opt_parser.error('ipv4 and ipv6 can not be used at the same time')
if self.options.verbose:
self.logger.setLevel(logging.DEBUG)
def _build_transfers(self):
if self.options.bulk_file:
filecontent = open(self.options.bulk_file).read()
with open(self.options.bulk_file, 'r') as file:
filecontent = file.read()
bulk = json.loads(filecontent)
if "files" in bulk:
return bulk["files"]
......@@ -184,36 +203,39 @@ class JobSubmitter(Base):
else:
return [{"sources": [self.source], "destinations": [self.destination]}]
def _do_submit(self, context):
#Backwards compatibility: compare_checksum parameter
def _build_params(self, **kwargs):
params = dict()
params.update(DEFAULT_PARAMS)
if self.options.bulk_file:
with open(self.options.bulk_file, 'r') as file:
filecontent = file.read()
bulk = json.loads(filecontent)
if 'params' in bulk:
params.update(bulk['params'])
# Apply command-line parameters
for k, v in kwargs.iteritems():
if v is not None:
params[k] = v
# JSONify metadata
params['job_metadata'] = _metadata(params['job_metadata'])
params['file_metadata'] = _metadata(params['file_metadata'])
return params
def _prepare_options(self):
# Backwards compatibility: compare_checksum parameter
# Note: compare_checksum has higher priority than checksum_mode
if self.options.compare_checksum:
checksum_mode = 'both'
else:
if self.checksum:
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode ='both'
elif self.options.checksum_mode:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
if not self.options.access_token:
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
delegate_when_lifetime_lt=timedelta(minutes=self.options.delegate_when_lifetime_lt)
)
submitter = Submitter(context)
job_id = submitter.submit(
self._build_transfers(),
self.transfers=self._build_transfers()
self.params=self._build_params(
checksum=self.checksum,
bring_online=self.options.bring_online,
archive_timeout=self.options.archive_timeout,
......@@ -222,10 +244,10 @@ class JobSubmitter(Base):
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
file_metadata=_metadata(self.options.file_metadata),
file_metadata=self.options.file_metadata,
filesize=self.options.file_size,
gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata),
job_metadata=self.options.job_metadata,
overwrite=self.options.overwrite,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
......@@ -239,6 +261,20 @@ class JobSubmitter(Base):
target_qos=self.options.target_qos
)
def _do_submit(self, context):
if not self.options.access_token:
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
delegate_when_lifetime_lt=timedelta(minutes=self.options.delegate_when_lifetime_lt)
)
submitter = Submitter(context)
job_id = submitter.submit(
transfers=self.transfers,
params=self.params
)
if self.options.json:
self.logger.info(json.dumps(job_id))
else:
......@@ -259,50 +295,10 @@ class JobSubmitter(Base):
return job_id
def _do_dry_run(self, context):
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
checksum_mode = 'both'
else:
if self.checksum:
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
submitter = Submitter(context)
print submitter.build_submission(
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
archive_timeout = self.options.archive_timeout,
timeout = self.options.timeout,
verify_checksum=checksum_mode,
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
file_metadata=_metadata(self.options.file_metadata),
filesize=self.options.file_size,
gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata),
overwrite=self.options.overwrite,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
retry=self.options.retry,
multihop=self.options.multihop,
credential=self.options.cloud_cred,
nostreams=self.options.nostreams,
ipv4=self.options.ipv4,
ipv6=self.options.ipv6,
s3alternate=self.options.s3alternate,
target_qos=self.options.target_qos
transfers=self.transfers,
params=self.params
)
return None
......
......@@ -27,10 +27,11 @@ class Submitter(object):
self.context = context
@staticmethod
def build_submission(transfers=None, delete=None, staging=None, **kwargs):
def build_submission(transfers=None, delete=None, staging=None, params=None, **kwargs):
job = dict()
job['params'] = dict()
if params:
job['params'].update(params)
job['params'].update(kwargs)
if delete:
......@@ -39,24 +40,27 @@ class Submitter(object):
job['staging'] = staging
if transfers:
job['files'] = transfers
if 'checksum' in kwargs:
if 'checksum' in job['params']:
for f in job['files']:
if 'checksum' not in f:
f['checksum'] = kwargs['checksum']
f['checksum'] = job['params']['checksum']
del job['params']['checksum']
if 'filesize' in kwargs:
if 'filesize' in job['params']:
for f in job['files']:
f['filesize'] = kwargs['filesize']
f['filesize'] = job['params']['filesize']
del job['params']['filesize']
if 'file_metadata' in kwargs:
if 'file_metadata' in job['params']:
for f in job['files']:
f['metadata'] = kwargs['file_metadata']
f['metadata'] = job['params']['file_metadata']
del job['params']['file_metadata']
return json.dumps(job, indent=2)
def submit(self, transfers=None, delete=None, **kwargs):
job = Submitter.build_submission(transfers, delete, **kwargs)
def submit(self, transfers=None, delete=None, params=None, **kwargs):
job = Submitter.build_submission(transfers=transfers,
delete=delete,
params=params,
**kwargs)
r = json.loads(self.context.post_json('/jobs', job))
return r['job_id']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment