Commit f12ebd1b authored by Maria Arsuaga Rios's avatar Maria Arsuaga Rios
Browse files

Merge branch 'test' into 'develop'

FTS-795, FTS-796 :Generalize Checksums in REST and Client

See merge request !11
parents c3bc10c2 2d6080e4
Pipeline #140860 passed with stage
in 2 minutes and 31 seconds
......@@ -27,6 +27,7 @@ import time
from base import Base
from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_CHECKSUM = 'ADLER32'
def _metadata(data):
try:
......@@ -110,7 +111,9 @@ class JobSubmitter(Base):
self.opt_parser.add_option('-S', '--source-token', dest='source_token',
help='the source space token or its description.')
self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true',
help='compare checksums between source and destination.')
help='deprecated: compare checksums between source and destination.')
self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string', default='none',
help='compare checksums in source, target, both or none.')
self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long', default=-1,
help='pin lifetime of the copy in seconds.')
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long', default=None,
......@@ -136,6 +139,7 @@ class JobSubmitter(Base):
help='force ipv6')
def validate(self):
self.checksum = None
if not self.options.bulk_file:
if len(self.args) < 2:
self.logger.critical("Need a source and a destination")
......@@ -172,10 +176,24 @@ class JobSubmitter(Base):
return [{"sources": [self.source], "destinations": [self.destination]}]
def _do_submit(self, context):
verify_checksum = None
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
verify_checksum = True
checksum_mode = 'both'
else:
if self.checksum:
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
......@@ -188,7 +206,7 @@ class JobSubmitter(Base):
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
verify_checksum=verify_checksum,
verify_checksum=checksum_mode[0],
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
......@@ -212,7 +230,6 @@ class JobSubmitter(Base):
else:
self.logger.info("Job successfully submitted.")
self.logger.info("Job id: %s" % job_id)
if job_id and self.options.blocking:
inquirer = Inquirer(context)
job = inquirer.get_job_status(job_id)
......@@ -228,16 +245,30 @@ class JobSubmitter(Base):
return job_id
def _do_dry_run(self, context):
verify_checksum = None
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
verify_checksum = True
checksum_mode = 'both'
else:
if self.checksum:
checksum_mode = 'target'
else:
checksum = 'none'
#Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = 'none'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
submitter = Submitter(context)
print submitter.build_submission(
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
verify_checksum=verify_checksum,
verify_checksum=checksum_mode,
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
......
......@@ -77,7 +77,7 @@ class Flag(TypeDecorator):
# This is used for flags that can be True, False, or some other thing
# i.e. verify_checksum flag, which can be True, False and 'Relaxed' (r)
# i.e. verify_checksum flag, which can be True, False, 'Relaxed' (r), 'Destination' (d), 'Source' (s), 'Both' (b), 'None' (n)
# reuse_job, which can be True, False and 'Multihop' (h)
class TernaryFlag(TypeDecorator):
impl = String
......
......@@ -49,7 +49,7 @@ class Job(Base):
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(TernaryFlag(positive='c'),
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
job_metadata = Column(Json(255))
......@@ -93,7 +93,7 @@ class ArchivedJob(Base):
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(Flag(positive='c'),
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
job_metadata = Column(Json(255))
......
......@@ -55,7 +55,7 @@ def cancel_all(context, vo_name=None):
return submitter.cancel_all(vo_name)
def new_transfer(source, destination, checksum=None, filesize=None, metadata=None, activity=None):
def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadata=None, activity=None):
"""
Creates a new transfer pair
......@@ -99,7 +99,7 @@ def add_alternative_source(transfer, alt_source):
return transfer
def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, overwrite=False, multihop=False,
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=False, overwrite=False, multihop=False,
source_spacetoken=None, spacetoken=None,
bring_online=None, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
......@@ -111,7 +111,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
Args:
transfers: Initial list of transfers
deletion: Delete files
verify_checksum: Enable checksum verification
verify_checksum: Enable checksum verification: source, destination, both or none
reuse: Enable reuse (all transfers are handled by the same process)
overwrite: Overwrite the destinations if exist
multihop: Treat the transfer as a multihop transfer
......@@ -133,6 +133,10 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
raise ClientError('Bad request: No transfers or deletion jobs are provided')
if transfers is None:
transfers = []
if isinstance(verify_checksum, basestring):
if not verify_checksum in ('source','target','both', 'none'):
raise ClientError('Bad request: verify_checksum does not contain a valid value')
params = dict(
verify_checksum=verify_checksum,
reuse=reuse,
......
......@@ -463,16 +463,25 @@ class JobBuilder(object):
if len(self.files) == 0:
raise HTTPBadRequest('No valid pairs available')
# If a checksum is provided, but no checksum is available, 'relaxed' comparison
# (Not nice, but need to keep functionality!)
# If a checksum is provided, but no checksum is available, 'target' comparison
# (Not nice, but need to keep functionality!) Also by default all files will have ADLER32 checksum type
has_checksum = False
for file_dict in self.files:
if file_dict['checksum'] is not None:
has_checksum = len(file_dict['checksum']) > 0
break
if not self.job['checksum_method'] and has_checksum:
self.job['checksum_method'] = 'r'
else:
file_dict['checksum'] = 'ADLER32'
if type(self.job['checksum_method']) == bool:
if not self.job['checksum_method'] and has_checksum:
self.job['checksum_method'] = 'target'
else:
if not self.job['checksum_method'] and not has_checksum:
self.job['checksum_method'] = 'none'
else:
self.job['checksum_method'] = 'both'
self.job['checksum_method'] = self.job['checksum_method'][0]
# Validate that if this is a multiple replica job, that there is one single unique file
self.is_multiple, unique_files = _has_multiple_options(self.files)
if self.is_multiple:
......
......@@ -238,7 +238,8 @@ class TestJobSubmission(TestController):
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_type, 'Y')
def test_submit_post(self):
"""
Submit a valid job using POST instead of PUT
......@@ -421,9 +422,152 @@ class TestJobSubmission(TestController):
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 'r')
self.assertEqual(job.verify_checksum, 't')
return job_id
def test_verify_checksum_target(self):
"""
Valid job, verify checksum in destination.
In the DB, it must end as 'r' (compatibility with FTS3 behaviour) or destination
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'target'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 't')
return job_id
def test_verify_checksum_source(self):
"""
Valid job, verify checksum in source.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'source'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 's')
return job_id
def test_verify_checksum_both(self):
"""
Valid job, verify checksum in source.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'both'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 'b')
return job_id
def test_verify_checksum_none(self):
"""
Valid job, verify checksum none.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'none'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 'n')
return job_id
def test_null_user_filesize(self):
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment