Commit b9b15061 authored by Maria Arsuaga Rios's avatar Maria Arsuaga Rios
Browse files

FTS-795, FTS-796 :Generalize Checksums in REST and Client

parent c3bc10c2
......@@ -27,6 +27,7 @@ import time
from base import Base
from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_CHECKSUM = 'ADLER32'
def _metadata(data):
try:
......@@ -110,7 +111,9 @@ class JobSubmitter(Base):
self.opt_parser.add_option('-S', '--source-token', dest='source_token',
help='the source space token or its description.')
self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true',
help='compare checksums between source and destination.')
help='deprecated: compare checksums between source and destination.')
self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string', default='none',
help='compare checksums in source, target, both or none.')
self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long', default=-1,
help='pin lifetime of the copy in seconds.')
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long', default=None,
......@@ -136,6 +139,7 @@ class JobSubmitter(Base):
help='force ipv6')
def validate(self):
self.checksum = None
if not self.options.bulk_file:
if len(self.args) < 2:
self.logger.critical("Need a source and a destination")
......@@ -156,11 +160,13 @@ class JobSubmitter(Base):
if self.options.verbose:
self.logger.setLevel(logging.DEBUG)
def _build_transfers(self):
if self.options.bulk_file:
filecontent = open(self.options.bulk_file).read()
bulk = json.loads(filecontent)
self.logger.info(bulk)
if "files" in bulk:
return bulk["files"]
elif "Files" in bulk:
......@@ -168,14 +174,32 @@ class JobSubmitter(Base):
else:
self.logger.critical("Could not find any transfers")
sys.exit(1)
else:
return [{"sources": [self.source], "destinations": [self.destination]}]
def _do_submit(self, context):
verify_checksum = None
checksum_mode = 'none'
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
verify_checksum = True
checksum_mode = 'both'
if self.checksum and not self.options.compare_checksum:
checksum_mode = 'target'
if not self.options.compare_checksum:
#Checksum mode has major priority than the deprecated compare_checksum
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
#Backwards compatibility: target checksum when checksum is provided and not verify option
if self.checksum:
checksum_mode = 'target'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
......@@ -188,7 +212,7 @@ class JobSubmitter(Base):
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
verify_checksum=verify_checksum,
verify_checksum=checksum_mode[0],
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
......@@ -212,7 +236,6 @@ class JobSubmitter(Base):
else:
self.logger.info("Job successfully submitted.")
self.logger.info("Job id: %s" % job_id)
if job_id and self.options.blocking:
inquirer = Inquirer(context)
job = inquirer.get_job_status(job_id)
......@@ -228,16 +251,31 @@ class JobSubmitter(Base):
return job_id
def _do_dry_run(self, context):
verify_checksum = None
checksum_mode = 'none'
#Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
verify_checksum = True
checksum_mode = 'both'
if self.checksum and not self.options.compare_checksum:
checksum_mode = 'target'
if not self.options.compare_checksum:
#Checksum mode has major priority than the deprecated compare_checksum
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
#Backwards compatibility: target checksum when checksum is provided and not verify option
if self.checksum:
checksum_mode = 'target'
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
submitter = Submitter(context)
print submitter.build_submission(
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
verify_checksum=verify_checksum,
verify_checksum=checksum_mode[0],
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
......
......@@ -77,7 +77,7 @@ class Flag(TypeDecorator):
# This is used for flags that can be True, False, or some other thing
# i.e. verify_checksum flag, which can be True, False and 'Relaxed' (r)
# i.e. verify_checksum flag, which can be True, False, 'Relaxed' (r), 'Destination' (d), 'Source' (s), 'Both' (b), 'None' (n)
# reuse_job, which can be True, False and 'Multihop' (h)
class TernaryFlag(TypeDecorator):
impl = String
......
......@@ -49,7 +49,7 @@ class Job(Base):
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(TernaryFlag(positive='c'),
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
job_metadata = Column(Json(255))
......@@ -93,7 +93,7 @@ class ArchivedJob(Base):
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(Flag(positive='c'),
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
job_metadata = Column(Json(255))
......
......@@ -55,7 +55,7 @@ def cancel_all(context, vo_name=None):
return submitter.cancel_all(vo_name)
def new_transfer(source, destination, checksum=None, filesize=None, metadata=None, activity=None):
def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadata=None, activity=None):
"""
Creates a new transfer pair
......@@ -99,7 +99,7 @@ def add_alternative_source(transfer, alt_source):
return transfer
def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, overwrite=False, multihop=False,
def new_job(transfers=None, deletion=None, verify_checksum='target', reuse=False, overwrite=False, multihop=False,
source_spacetoken=None, spacetoken=None,
bring_online=None, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
......@@ -111,7 +111,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
Args:
transfers: Initial list of transfers
deletion: Delete files
verify_checksum: Enable checksum verification
verify_checksum: Enable checksum verification: source, destination, both or none
reuse: Enable reuse (all transfers are handled by the same process)
overwrite: Overwrite the destinations if exist
multihop: Treat the transfer as a multihop transfer
......@@ -133,6 +133,16 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
raise ClientError('Bad request: No transfers or deletion jobs are provided')
if transfers is None:
transfers = []
if isinstance(verify_checksum, bool):
if verify_checksum:
verify_checksum = 'both'
else:
verify_checksum = 'none'
else:
if isinstance(verify_checksum, basestring):
if not verify_checksum in ('source','target','both', 'none'):
raise ClientError('Bad request: verify_checksum does not contain a valid value')
params = dict(
verify_checksum=verify_checksum,
reuse=reuse,
......
......@@ -42,7 +42,7 @@ MAX_SIZE_SMALL_FILE = 104857600 #100MB
DEFAULT_PARAMS = {
'bring_online': -1,
'verify_checksum': False,
'verify_checksum': 'none',
'copy_pin_lifetime': -1,
'gridftp': '',
'job_metadata': None,
......@@ -463,16 +463,28 @@ class JobBuilder(object):
if len(self.files) == 0:
raise HTTPBadRequest('No valid pairs available')
# If a checksum is provided, but no checksum is available, 'relaxed' comparison
# (Not nice, but need to keep functionality!)
# If a checksum is provided, but no checksum is available, 'target' comparison
# (Not nice, but need to keep functionality!) Also by default all files will have ADLER32 checksum type
has_checksum = False
for file_dict in self.files:
if file_dict['checksum'] is not None:
has_checksum = len(file_dict['checksum']) > 0
break
if not self.job['checksum_method'] and has_checksum:
self.job['checksum_method'] = 'r'
if file_dict['checksum'] is None:
file_dict['checksum'] = 'ADLER32'
for file_dict in self.files:
if file_dict['checksum'] is None:
file_dict['checksum'] = 'ADLER32'
if type(self.job['checksum_method']) == bool:
if not self.job['checksum_method'] and has_checksum:
self.job['checksum_method'] = 'target'
else:
if not self.job['checksum_method'] and not has_checksum:
self.job['checksum_method'] = 'none'
else:
self.job['checksum_method'] = 'both'
self.job['checksum_method']=self.job['checksum_method'][0]
# Validate that if this is a multiple replica job, that there is one single unique file
self.is_multiple, unique_files = _has_multiple_options(self.files)
if self.is_multiple:
......@@ -544,7 +556,7 @@ class JobBuilder(object):
overwrite_flag='N',
source_space_token=self.params['source_spacetoken'],
copy_pin_lifetime=-1,
checksum_method=None,
checksum_method='n',
bring_online=None,
job_metadata=self.params['job_metadata'],
internal_job_params=None,
......
......@@ -238,7 +238,8 @@ class TestJobSubmission(TestController):
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_type, 'Y')
def test_submit_post(self):
"""
Submit a valid job using POST instead of PUT
......@@ -424,6 +425,149 @@ class TestJobSubmission(TestController):
self.assertEqual(job.verify_checksum, 'r')
return job_id
def test_verify_checksum_target(self):
"""
Valid job, verify checksum in destination.
In the DB, it must end as 'r' (compatibility with FTS3 behaviour) or destination
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'target'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 't')
return job_id
def test_verify_checksum_source(self):
"""
Valid job, verify checksum in source.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'source'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 's')
return job_id
def test_verify_checksum_both(self):
"""
Valid job, verify checksum in source.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'both'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 'b')
return job_id
def test_verify_checksum_none(self):
"""
Valid job, verify checksum none.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'selection_strategy': 'orderly',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum':'none'}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
# Make sure it was committed to the DB
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.files[0].checksum, '1234F')
self.assertEqual(job.verify_checksum, 'n')
return job_id
def test_null_user_filesize(self):
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment