Commit 01da9671 authored by Mihai Patrascoiu's avatar Mihai Patrascoiu
Browse files

Rework parsing of job submission parameters and unify the code branches for...

Rework parsing of job submission parameters and unify the code branches for actual and dry-run submissions.

The rework follows a 3-tier approach:
  - Apply the default parameters
  - Read and apply parameters from bulk-file (if present)
  - Apply parameters passed via CLI
parent a10e3713
...@@ -27,8 +27,31 @@ import time ...@@ -27,8 +27,31 @@ import time
from base import Base from base import Base
from fts3.rest.client import Submitter, Delegator, Inquirer from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_CHECKSUM = 'ADLER32' DEFAULT_PARAMS = {
'checksum': 'ADLER32',
'overwrite': False,
'reuse': False,
'job_metadata': None,
'file_metadata': None,
'filesize': None,
'gridftp': None,
'spacetoken': None,
'source_spacetoken': None,
'verify_checksum': 'n',
'copy_pin_lifetime': -1,
'bring_online': -1,
'archive_timeout': -1,
'timeout': None,
'fail_nearline': False,
'retry': 0,
'multihop': False,
'credential': None,
'nostreams': None,
's3alternate': False,
'target_qos': None,
'ipv4': False,
'ipv6': False
}
def _metadata(data): def _metadata(data):
try: try:
...@@ -95,9 +118,9 @@ class JobSubmitter(Base): ...@@ -95,9 +118,9 @@ class JobSubmitter(Base):
help='expiration time of the delegation in minutes.') help='expiration time of the delegation in minutes.')
self.opt_parser.add_option('--delegate-when-lifetime-lt', type=int, default=120, self.opt_parser.add_option('--delegate-when-lifetime-lt', type=int, default=120,
help='delegate the proxy when the remote lifetime is less than this value (in minutes)') help='delegate the proxy when the remote lifetime is less than this value (in minutes)')
self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', default=False, action='store_true', self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true',
help='overwrite files.') help='overwrite files.')
self.opt_parser.add_option('-r', '--reuse', dest='reuse', default=False, action='store_true', self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true',
help='enable session reuse for the transfer job.') help='enable session reuse for the transfer job.')
self.opt_parser.add_option('--job-metadata', dest='job_metadata', self.opt_parser.add_option('--job-metadata', dest='job_metadata',
help='transfer job metadata.') help='transfer job metadata.')
...@@ -113,38 +136,38 @@ class JobSubmitter(Base): ...@@ -113,38 +136,38 @@ class JobSubmitter(Base):
help='the source space token or its description.') help='the source space token or its description.')
self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true', self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true',
help='deprecated: compare checksums between source and destination.') help='deprecated: compare checksums between source and destination.')
self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string', default='none', self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string',
help='compare checksums in source, target, both or none.') help='compare checksums in source, target, both or none.')
self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long', default=-1, self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long',
help='pin lifetime of the copy in seconds.') help='pin lifetime of the copy in seconds.')
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long', default=None, self.opt_parser.add_option('--bring-online', dest='bring_online', type='long',
help='bring online timeout in seconds.') help='bring online timeout in seconds.')
self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long', default=None, self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long',
help='archive timeout in seconds.') help='archive timeout in seconds.')
self.opt_parser.add_option('--timeout', dest='timeout', type='long', default=None, self.opt_parser.add_option('--timeout', dest='timeout', type='long',
help='transfer timeout in seconds.') help='transfer timeout in seconds.')
self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', default=False, action='store_true', self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', action='store_true',
help='fail the transfer if the file is nearline.') help='fail the transfer if the file is nearline.')
self.opt_parser.add_option('--dry-run', dest='dry_run', default=False, action='store_true', self.opt_parser.add_option('--dry-run', dest='dry_run', default=False, action='store_true',
help='do not send anything, just print the JSON message.') help='do not send anything, just print the JSON message.')
self.opt_parser.add_option('-f', '--file', dest='bulk_file', type='string', self.opt_parser.add_option('-f', '--file', dest='bulk_file', type='string',
help='Name of configuration file') help='Name of configuration file')
self.opt_parser.add_option('--retry', dest='retry', type='int', default=0, self.opt_parser.add_option('--retry', dest='retry', type='int',
help='Number of retries. If 0, the server default will be used.' help='Number of retries. If 0, the server default will be used.'
'If negative, there will be no retries.') 'If negative, there will be no retries.')
self.opt_parser.add_option('-m', '--multi-hop', dest='multihop', default=False, action='store_true', self.opt_parser.add_option('-m', '--multi-hop', dest='multihop', action='store_true',
help='submit a multihop transfer.') help='submit a multihop transfer.')
self.opt_parser.add_option('--cloud-credentials', dest='cloud_cred', default=None, self.opt_parser.add_option('--cloud-credentials', dest='cloud_cred',
help='use cloud credentials for the job (i.e. dropbox).') help='use cloud credentials for the job (i.e. dropbox).')
self.opt_parser.add_option('--nostreams', dest='nostreams', default=None, self.opt_parser.add_option('--nostreams', dest='nostreams',
help='number of streams') help='number of streams')
self.opt_parser.add_option('--ipv4', dest='ipv4', default=False, action='store_true', self.opt_parser.add_option('--ipv4', dest='ipv4', action='store_true',
help='force ipv4') help='force ipv4')
self.opt_parser.add_option('--ipv6', dest='ipv6', default=False, action='store_true', self.opt_parser.add_option('--ipv6', dest='ipv6', action='store_true',
help='force ipv6') help='force ipv6')
self.opt_parser.add_option('--s3alternate', dest='s3alternate', default=False, action='store_true', self.opt_parser.add_option('--s3alternate', dest='s3alternate', action='store_true',
help='use S3 alternate URL') help='use S3 alternate URL')
self.opt_parser.add_option('--target-qos', dest='target_qos', type='string', default=None, self.opt_parser.add_option('--target-qos', dest='target_qos', type='string',
help='define the target QoS for this transfer for CDMI endpoints') help='define the target QoS for this transfer for CDMI endpoints')
def validate(self): def validate(self):
...@@ -155,14 +178,11 @@ class JobSubmitter(Base): ...@@ -155,14 +178,11 @@ class JobSubmitter(Base):
sys.exit(1) sys.exit(1)
elif len(self.args) == 2: elif len(self.args) == 2:
(self.source, self.destination) = self.args (self.source, self.destination) = self.args
self.checksum = None
elif len(self.args) == 3: elif len(self.args) == 3:
(self.source, self.destination, self.checksum) = self.args (self.source, self.destination, self.checksum) = self.args
else: else:
self.logger.critical("Too many parameters") self.logger.critical("Too many parameters")
sys.exit(1) sys.exit(1)
else:
self.checksum = None
if self.options.ipv4 and self.options.ipv6: if self.options.ipv4 and self.options.ipv6:
self.opt_parser.error('ipv4 and ipv6 can not be used at the same time') self.opt_parser.error('ipv4 and ipv6 can not be used at the same time')
...@@ -185,46 +205,39 @@ class JobSubmitter(Base): ...@@ -185,46 +205,39 @@ class JobSubmitter(Base):
else: else:
return [{"sources": [self.source], "destinations": [self.destination]}] return [{"sources": [self.source], "destinations": [self.destination]}]
def _build_params(self): def _build_params(self, **kwargs):
with open(self.options.bulk_file, 'r') as file: params = dict()
filecontent = file.read() params.update(DEFAULT_PARAMS)
bulk = json.loads(filecontent)
params = None
if 'params' in bulk:
params = bulk['params']
return params
def _do_submit(self, context): if self.options.bulk_file:
#Backwards compatibility: compare_checksum parameter with open(self.options.bulk_file, 'r') as file:
if self.options.compare_checksum: filecontent = file.read()
checksum_mode = 'both' bulk = json.loads(filecontent)
else: if 'params' in bulk:
if self.checksum: params.update(bulk['params'])
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum: # Apply command-line parameters
self.checksum = DEFAULT_CHECKSUM for k, v in kwargs.iteritems():
if v is not None:
params[k] = v
if not self.options.access_token: # JSONify metadata
delegator = Delegator(context) params['job_metadata'] = _metadata(params['job_metadata'])
delegator.delegate( params['file_metadata'] = _metadata(params['file_metadata'])
timedelta(minutes=self.options.proxy_lifetime), return params
delegate_when_lifetime_lt=timedelta(minutes=self.options.delegate_when_lifetime_lt)
)
submitter = Submitter(context) def _prepare_options(self):
# Backwards compatibility: compare_checksum parameter
# Note: compare_checksum has higher priority than checksum_mode
if self.options.compare_checksum:
checksum_mode ='both'
elif self.options.checksum_mode:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
job_id = submitter.submit( self.transfers=self._build_transfers()
transfers=self._build_transfers(), self.params=self._build_params(
params=self._build_params(),
checksum=self.checksum, checksum=self.checksum,
bring_online=self.options.bring_online, bring_online=self.options.bring_online,
archive_timeout=self.options.archive_timeout, archive_timeout=self.options.archive_timeout,
...@@ -233,10 +246,10 @@ class JobSubmitter(Base): ...@@ -233,10 +246,10 @@ class JobSubmitter(Base):
spacetoken=self.options.destination_token, spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token, source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline, fail_nearline=self.options.fail_nearline,
file_metadata=_metadata(self.options.file_metadata), file_metadata=self.options.file_metadata,
filesize=self.options.file_size, filesize=self.options.file_size,
gridftp=self.options.gridftp_params, gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata), job_metadata=self.options.job_metadata,
overwrite=self.options.overwrite, overwrite=self.options.overwrite,
copy_pin_lifetime=self.options.pin_lifetime, copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse, reuse=self.options.reuse,
...@@ -250,6 +263,20 @@ class JobSubmitter(Base): ...@@ -250,6 +263,20 @@ class JobSubmitter(Base):
target_qos=self.options.target_qos target_qos=self.options.target_qos
) )
def _do_submit(self, context):
if not self.options.access_token:
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
delegate_when_lifetime_lt=timedelta(minutes=self.options.delegate_when_lifetime_lt)
)
submitter = Submitter(context)
job_id = submitter.submit(
transfers=self.transfers,
params=self.params
)
if self.options.json: if self.options.json:
self.logger.info(json.dumps(job_id)) self.logger.info(json.dumps(job_id))
else: else:
...@@ -270,55 +297,16 @@ class JobSubmitter(Base): ...@@ -270,55 +297,16 @@ class JobSubmitter(Base):
return job_id return job_id
def _do_dry_run(self, context): def _do_dry_run(self, context):
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
checksum_mode = 'both'
else:
if self.checksum:
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
submitter = Submitter(context) submitter = Submitter(context)
print submitter.build_submission( print submitter.build_submission(
self._build_transfers(), transfers=self.transfers,
checksum=self.checksum, params=self.params
bring_online=self.options.bring_online,
archive_timeout = self.options.archive_timeout,
timeout = self.options.timeout,
verify_checksum=checksum_mode,
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
file_metadata=_metadata(self.options.file_metadata),
filesize=self.options.file_size,
gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata),
overwrite=self.options.overwrite,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
retry=self.options.retry,
multihop=self.options.multihop,
credential=self.options.cloud_cred,
nostreams=self.options.nostreams,
ipv4=self.options.ipv4,
ipv6=self.options.ipv6,
s3alternate=self.options.s3alternate,
target_qos=self.options.target_qos
) )
return None return None
def run(self): def run(self):
context = self._create_context() context = self._create_context()
self._prepare_options()
if not self.options.dry_run: if not self.options.dry_run:
return self._do_submit(context) return self._do_submit(context)
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment