Commit b9ea6f9c authored by Joao Pedro Lopes's avatar Joao Pedro Lopes
Browse files

FTS-1710 Add overwrite-on-retry option to client cli and python bindings

parent e287784c
...@@ -43,6 +43,7 @@ Note that _all_ parameters are optional. ...@@ -43,6 +43,7 @@ Note that _all_ parameters are optional.
* **verify_checksum** Enable checksum verification or indicate source, target, both or none checksum mode. * **verify_checksum** Enable checksum verification or indicate source, target, both or none checksum mode.
* **reuse** Enable reuse (all transfers are handled by the same process) * **reuse** Enable reuse (all transfers are handled by the same process)
* **overwrite** Overwrite the destinations if exist * **overwrite** Overwrite the destinations if exist
* **overwrite_on_retry** Enable overwrite files only during FTS retries
* **multihop** Treat the transfer as a multihop transfer * **multihop** Treat the transfer as a multihop transfer
* **source_spacetoken** Source space token * **source_spacetoken** Source space token
* **spacetoken** Destination space token * **spacetoken** Destination space token
......
...@@ -105,6 +105,11 @@ Overwrite files. ...@@ -105,6 +105,11 @@ Overwrite files.
.RS .RS
.RE .RE
.TP .TP
.B -o/--overwrite-on-retry
Enable overwrite files only during FTS retries.
.RS
.RE
.TP
.B -r/--reuse .B -r/--reuse
Enable session reuse for the transfer job. Enable session reuse for the transfer job.
.RS .RS
......
...@@ -30,6 +30,7 @@ from fts3.rest.client import Submitter, Delegator, Inquirer ...@@ -30,6 +30,7 @@ from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_PARAMS = { DEFAULT_PARAMS = {
'checksum': 'ADLER32', 'checksum': 'ADLER32',
'overwrite': False, 'overwrite': False,
'overwrite_on_retry': False,
'reuse': False, 'reuse': False,
'job_metadata': None, 'job_metadata': None,
'file_metadata': None, 'file_metadata': None,
...@@ -122,6 +123,8 @@ class JobSubmitter(Base): ...@@ -122,6 +123,8 @@ class JobSubmitter(Base):
help='delegate the proxy when the remote lifetime is less than this value (in minutes)') help='delegate the proxy when the remote lifetime is less than this value (in minutes)')
self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true', self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true',
help='overwrite files.') help='overwrite files.')
self.opt_parser.add_option('--overwrite-on-retry', dest='overwrite_on_retry', action='store_true',
help='enable overwrite files only during FTS retries.')
self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true', self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true',
help='enable session reuse for the transfer job.') help='enable session reuse for the transfer job.')
self.opt_parser.add_option('--job-metadata', dest='job_metadata', self.opt_parser.add_option('--job-metadata', dest='job_metadata',
...@@ -193,6 +196,8 @@ class JobSubmitter(Base): ...@@ -193,6 +196,8 @@ class JobSubmitter(Base):
self._prepare_options() self._prepare_options()
if self.params['ipv4'] and self.params['ipv6']: if self.params['ipv4'] and self.params['ipv6']:
self.opt_parser.error('ipv4 and ipv6 can not be used at the same time') self.opt_parser.error('ipv4 and ipv6 can not be used at the same time')
if self.params['overwrite'] and self.params['overwrite_on_retry']:
self.opt_parser.error('overwrite and overwrite-on-retry can not be used at the same time')
def _build_transfers(self): def _build_transfers(self):
if self.options.bulk_file: if self.options.bulk_file:
...@@ -258,6 +263,7 @@ class JobSubmitter(Base): ...@@ -258,6 +263,7 @@ class JobSubmitter(Base):
gridftp=self.options.gridftp_params, gridftp=self.options.gridftp_params,
job_metadata=self.options.job_metadata, job_metadata=self.options.job_metadata,
overwrite=self.options.overwrite, overwrite=self.options.overwrite,
overwrite_on_retry=self.options.overwrite_on_retry,
copy_pin_lifetime=self.options.pin_lifetime, copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse, reuse=self.options.reuse,
retry=self.options.retry, retry=self.options.retry,
......
...@@ -103,7 +103,7 @@ def add_alternative_source(transfer, alt_source): ...@@ -103,7 +103,7 @@ def add_alternative_source(transfer, alt_source):
return transfer return transfer
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, multihop=False, def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, overwrite_on_retry=False, multihop=False,
source_spacetoken=None, spacetoken=None, source_spacetoken=None, spacetoken=None,
bring_online=None, archive_timeout=None, dst_file_report=False, copy_pin_lifetime=None, bring_online=None, archive_timeout=None, dst_file_report=False, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False, retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
...@@ -119,6 +119,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov ...@@ -119,6 +119,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
verify_checksum: Enable checksum verification: source, destination, both or none verify_checksum: Enable checksum verification: source, destination, both or none
reuse: Enable reuse (all transfers are handled by the same process) reuse: Enable reuse (all transfers are handled by the same process)
overwrite: Overwrite the destinations if exist overwrite: Overwrite the destinations if exist
overwrite_on_retry: Enable overwrite files only during FTS retries
multihop: Treat the transfer as a multihop transfer multihop: Treat the transfer as a multihop transfer
source_spacetoken: Source space token source_spacetoken: Source space token
spacetoken: Destination space token spacetoken: Destination space token
...@@ -147,6 +148,10 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov ...@@ -147,6 +148,10 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
if isinstance(verify_checksum, basestring): if isinstance(verify_checksum, basestring):
if not verify_checksum in ('source','target','both', 'none'): if not verify_checksum in ('source','target','both', 'none'):
raise ClientError('Bad request: verify_checksum does not contain a valid value') raise ClientError('Bad request: verify_checksum does not contain a valid value')
if overwrite != False and overwrite_on_retry != False:
raise ClientError('Bad request: overwrite and overwrite-on-retry can not be used at the same time')
params = dict( params = dict(
verify_checksum=verify_checksum, verify_checksum=verify_checksum,
reuse=reuse, reuse=reuse,
...@@ -158,6 +163,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov ...@@ -158,6 +163,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
job_metadata=metadata, job_metadata=metadata,
source_spacetoken=source_spacetoken, source_spacetoken=source_spacetoken,
overwrite=overwrite, overwrite=overwrite,
overwrite_on_retry=overwrite_on_retry,
multihop=multihop, multihop=multihop,
retry=retry, retry=retry,
retry_delay=retry_delay, retry_delay=retry_delay,
......
...@@ -48,6 +48,7 @@ DEFAULT_PARAMS = { ...@@ -48,6 +48,7 @@ DEFAULT_PARAMS = {
'job_metadata': None, 'job_metadata': None,
'overwrite': False, 'overwrite': False,
'dst_file_report': False, 'dst_file_report': False,
'overwrite_on_retry': False,
'reuse': None, 'reuse': None,
'multihop': False, 'multihop': False,
'source_spacetoken': '', 'source_spacetoken': '',
...@@ -491,6 +492,13 @@ class JobBuilder(object): ...@@ -491,6 +492,13 @@ class JobBuilder(object):
if max_time_in_queue is not None: if max_time_in_queue is not None:
expiration_time = time.time() + max_time_in_queue expiration_time = time.time() + max_time_in_queue
if self.params['overwrite']:
overwrite_flag = 'Y'
elif self.params['overwrite_on_retry']:
overwrite_flag = 'R'
else:
overwrite_flag = None
self.job = dict( self.job = dict(
job_id=self.job_id, job_id=self.job_id,
job_state=job_initial_state, job_state=job_initial_state,
...@@ -506,8 +514,8 @@ class JobBuilder(object): ...@@ -506,8 +514,8 @@ class JobBuilder(object):
submit_time=datetime.utcnow(), submit_time=datetime.utcnow(),
priority=max(min(int(self.params['priority']), 5), 1), priority=max(min(int(self.params['priority']), 5), 1),
space_token=self.params['spacetoken'], space_token=self.params['spacetoken'],
overwrite_flag=_safe_flag(self.params['overwrite']),
dst_file_report=_safe_flag(self.params['dst_file_report']), dst_file_report=_safe_flag(self.params['dst_file_report']),
overwrite_flag=overwrite_flag,
source_space_token=self.params['source_spacetoken'], source_space_token=self.params['source_spacetoken'],
copy_pin_lifetime=int(self.params['copy_pin_lifetime']), copy_pin_lifetime=int(self.params['copy_pin_lifetime']),
checksum_method=self.params['verify_checksum'], checksum_method=self.params['verify_checksum'],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment