Commit ef714792 authored by Mihai Patrascoiu's avatar Mihai Patrascoiu
Browse files

Merge branch 'develop' for release 3.11.0

parents 5a8dad01 9c8d0fc4
Pipeline #3043354 passed with stage
in 2 minutes and 36 seconds
......@@ -102,7 +102,7 @@ job.
{
"files": [],
"params": {
"max_time_in_queue": 36000,
"max_time_in_queue": "36000s",
"timeout": 3600,
"nostreams": 0,
"buffer_size": 1024,
......@@ -132,14 +132,14 @@ job.
Of course, all these parameters are optional. You can set only a subset of them,
or none at all.
* **max_time_in_queue** After this number of seconds on the queue, the transfer will be
canceled.
* **max_time_in_queue** After this number of hours on the queue, the transfer will be canceled.
A suffix such as 's', 'm' or 'h' is accepted.
* **timeout** After this number of seconds running, the transfer will be aborted.
* **nostreams** Number of streams to use during the transfer.
Not all protocols support this.
* **buffer_size** TCP buffer size.
* **strict_copy** If true, only a transfer will be done. No checksum, no size validation, no
parent directory creation... useful for endpoints that do not support any, of some, of this
parent directory creation... useful for endpoints that do not support any, or some, of these
operations, like S3.
* **ipv4** Enable/disable IPv4 support. Not all protocols support this.
* **ipv6** Enable/disable IPv6 support. Not all protocols support this.
......@@ -148,7 +148,7 @@ If any step fails, the remaining ones wil be canceled.
* **reuse** Transfer all the files within the job reusing the same connection. Not all protocols
support this. Useful for small files.
* **copy_pin_lifetime** If greater than -1, a Bring Online operation will be done prior to the
stransfer. The value of this field will be sent to the remote storage for the lifetime
transfer. The value of this field will be sent to the remote storage for the lifetime
of the replica on disk.
* **bring_online** Bring online timeout. After this number of seconds, the staging operation will
be canceled, and the transfer will not take place. If greater than -1, a Bring Online operation will
......@@ -157,7 +157,7 @@ be done.
* **source_spacetoken** Source space token.
* **retry** Let FTS3 retry the file on error. Not all errors are retries. For instance,
a "File not found" will *not* be retried.
* **retry_dely** Wait this many seconds between retries.
* **retry_delay** Wait this many seconds between retries.
* **priority** Job priority. Applied after VO Shares, and Activity Shares. *No* fair share.
If you keep submitting jobs with high priorities, jobs with lower priority will not go
through.
......
......@@ -43,6 +43,7 @@ Note that _all_ parameters are optional.
* **verify_checksum** Enable checksum verification or indicate source, target, both or none checksum mode.
* **reuse** Enable reuse (all transfers are handled by the same process)
* **overwrite** Overwrite the destinations if exist
* **overwrite_on_retry** Enable overwrite files only during FTS retries
* **multihop** Treat the transfer as a multihop transfer
* **source_spacetoken** Source space token
* **spacetoken** Destination space token
......@@ -55,6 +56,7 @@ Note that _all_ parameters are optional.
* **sid** Specific id given by the user to be used with the deterministic job id generator algorithm
* **max_time_in_queue** Max time the job can be on the queue. Accepts an integer without suffix (interpreted as hours), or with a suffix s (seconds), m (minutes) or h (hours). e.g `60s`
* **priority** Job priority. It should be a number between 1 and 5. The higher the number, the higher the priority.
* **buffer_size** TCP buffer size (in bytes) that will be used for the given transfer job
### Returns:
A dictionary representing a new job submission
......
......@@ -105,6 +105,11 @@ Overwrite files.
.RS
.RE
.TP
.B -o/--overwrite-on-retry
Enable overwrite files only during FTS retries.
.RS
.RE
.TP
.B -r/--reuse
Enable session reuse for the transfer job.
.RS
......@@ -188,6 +193,15 @@ E.
Dropbox).
.RS
.RE
.TP
.B --dst_file_report
Report on the destination tape file if archiving to tape, overwrite is not
switched on and the tape file already exists. The report will include both the
size and checksum of the file. This integrity check can be used by the end user
or system to decide whether or not to stop trying to archive the file to tape
because it already exists.
.RS
.RE
.SH EXAMPLE
.IP
.nf
......
......@@ -4,21 +4,21 @@
%{!?nosetest_path: %global nosetest_path "/tmp"}
Name: fts-rest
Version: 3.10.1
Version: 3.11.0
Release: 1%{?dist}
BuildArch: noarch
Summary: FTS3 Rest Interface
Group: Applications/Internet
License: ASL 2.0
URL: http://fts3-service.web.cern.ch/
URL: https://fts.web.cern.ch/
# The source for this package was pulled from upstream VCS.
# Use the following commands to generate the tarball:
# git clone https://gitlab.cern.ch/fts/fts-rest.git -b master --depth 1 fts-rest-3.10.0
# cd fts-rest-3.10.0
# git checkout v3.10.0
# git clone https://gitlab.cern.ch/fts/fts-rest.git -b master --depth 1 fts-rest-3.11.0
# cd fts-rest-3.11.0
# git checkout v3.11.0
# git submodule init && git submodule update
# cd ..
# tar -vczf fts-rest-3.10.0.tar.gz --exclude-vcs fts-rest-3.10.0
# tar -vczf fts-rest-3.11.0.tar.gz --exclude-vcs fts-rest-3.11.0
Source0: %{name}-%{version}.tar.gz
BuildRequires: gfal2-python
......@@ -350,6 +350,10 @@ EOF
%changelog
* Wed Sep 22 2021 Mihai Patrascoiu <mihai.patrascoiu@cern.ch> - 3.11.0-1
- New Minor release
- Destination file report feature
* Fri Feb 12 2021 Mihai Patrascoiu <mihai.patrascoiu@cern.ch> - 3.10.1-1
- New bugfix release
......
......@@ -64,7 +64,7 @@ base_dir = os.path.dirname(__file__)
setup(
name='fts3-rest',
version='3.10.1',
version='3.11.0',
description='FTS3 Python Libraries',
long_description='FTS3 Python Libraries',
author='FTS3 Developers',
......
......@@ -30,6 +30,7 @@ from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_PARAMS = {
'checksum': 'ADLER32',
'overwrite': False,
'overwrite_on_retry': False,
'reuse': False,
'job_metadata': None,
'file_metadata': None,
......@@ -41,6 +42,7 @@ DEFAULT_PARAMS = {
'copy_pin_lifetime': -1,
'bring_online': -1,
'archive_timeout': -1,
'dst_file_report': False,
'timeout': None,
'fail_nearline': False,
'retry': 0,
......@@ -50,7 +52,8 @@ DEFAULT_PARAMS = {
's3alternate': False,
'target_qos': None,
'ipv4': False,
'ipv6': False
'ipv6': False,
'buffer_size': None
}
def _metadata(data):
......@@ -120,6 +123,8 @@ class JobSubmitter(Base):
help='delegate the proxy when the remote lifetime is less than this value (in minutes)')
self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true',
help='overwrite files.')
self.opt_parser.add_option('--overwrite-on-retry', dest='overwrite_on_retry', action='store_true',
help='enable overwrite files only during FTS retries.')
self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true',
help='enable session reuse for the transfer job.')
self.opt_parser.add_option('--job-metadata', dest='job_metadata',
......@@ -144,6 +149,8 @@ class JobSubmitter(Base):
help='bring online timeout in seconds.')
self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long',
help='archive timeout in seconds.')
self.opt_parser.add_option('--dst-file-report', dest='dst_file_report', default=False, action='store_true',
help='report on the destination tape file if it already exists and overwrite is off.')
self.opt_parser.add_option('--timeout', dest='timeout', type='long',
help='transfer timeout in seconds.')
self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', action='store_true',
......@@ -169,6 +176,8 @@ class JobSubmitter(Base):
help='use S3 alternate URL')
self.opt_parser.add_option('--target-qos', dest='target_qos', type='string',
help='define the target QoS for this transfer for CDMI endpoints')
self.opt_parser.add_option('--buffer-size', '--buff-size', dest='buffer_size', type=int,
help="TCP buffer size (expressed in bytes) that will be used for the given transfer job")
def validate(self):
self.checksum = None
......@@ -187,6 +196,8 @@ class JobSubmitter(Base):
self._prepare_options()
if self.params['ipv4'] and self.params['ipv6']:
self.opt_parser.error('ipv4 and ipv6 can not be used at the same time')
if self.params['overwrite'] and self.params['overwrite_on_retry']:
self.opt_parser.error('overwrite and overwrite-on-retry can not be used at the same time')
def _build_transfers(self):
if self.options.bulk_file:
......@@ -241,6 +252,7 @@ class JobSubmitter(Base):
checksum=self.checksum,
bring_online=self.options.bring_online,
archive_timeout=self.options.archive_timeout,
dst_file_report=self.options.dst_file_report,
timeout = self.options.timeout,
verify_checksum=checksum_mode[0],
spacetoken=self.options.destination_token,
......@@ -251,6 +263,7 @@ class JobSubmitter(Base):
gridftp=self.options.gridftp_params,
job_metadata=self.options.job_metadata,
overwrite=self.options.overwrite,
overwrite_on_retry=self.options.overwrite_on_retry,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
retry=self.options.retry,
......@@ -260,7 +273,8 @@ class JobSubmitter(Base):
ipv4=self.options.ipv4,
ipv6=self.options.ipv6,
s3alternate=self.options.s3alternate,
target_qos=self.options.target_qos
target_qos=self.options.target_qos,
buffer_size=self.options.buffer_size
)
def _do_submit(self, context):
......
......@@ -45,7 +45,8 @@ class Job(Base):
max_time_in_queue = Column(Integer)
space_token = Column(String(255))
internal_job_params = Column(String(255))
overwrite_flag = Column(Flag)
dst_file_report = Column(Flag)
overwrite_flag = Column(TernaryFlag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
......@@ -91,7 +92,8 @@ class ArchivedJob(Base):
max_time_in_queue = Column(Integer)
space_token = Column(String(255))
internal_job_params = Column(String(255))
overwrite_flag = Column(Flag)
dst_file_report = Column(Flag)
overwrite_flag = Column(TernaryFlag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
......
......@@ -103,12 +103,13 @@ def add_alternative_source(transfer, alt_source):
return transfer
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, multihop=False,
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, overwrite_on_retry=False, multihop=False,
source_spacetoken=None, spacetoken=None,
bring_online=None, archive_timeout=None, copy_pin_lifetime=None,
bring_online=None, archive_timeout=None, dst_file_report=False, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
max_time_in_queue=None, timeout=None,
id_generator=JobIdGenerator.standard, sid=None, s3alternate=False, nostreams=1):
id_generator=JobIdGenerator.standard, sid=None,
s3alternate=False, nostreams=1, buffer_size=None):
"""
Creates a new dictionary representing a job
......@@ -118,11 +119,13 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
verify_checksum: Enable checksum verification: source, destination, both or none
reuse: Enable reuse (all transfers are handled by the same process)
overwrite: Overwrite the destinations if exist
overwrite_on_retry: Enable overwrite files only during FTS retries
multihop: Treat the transfer as a multihop transfer
source_spacetoken: Source space token
spacetoken: Destination space token
bring_online: Bring online timeout
archive_timeout: Archive timeout
dst_file_report: Report on the destination tape file if it already exists and overwrite is off
copy_pin_lifetime: Pin lifetime
retry: Number of retries: <0 is no retries, 0 is server default, >0 is whatever value is passed
metadata: Metadata to bind to the job
......@@ -132,6 +135,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
sid: Specific id given by the client
s3alternate: Use S3 alternate url schema
nostreams: Number of streams
buffer_size: Tcp buffer size (in bytes) that will be used for the given transfer-job
Returns:
An initialized dictionary representing a job
......@@ -144,16 +148,22 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
if isinstance(verify_checksum, basestring):
if not verify_checksum in ('source','target','both', 'none'):
raise ClientError('Bad request: verify_checksum does not contain a valid value')
if overwrite != False and overwrite_on_retry != False:
raise ClientError('Bad request: overwrite and overwrite-on-retry can not be used at the same time')
params = dict(
verify_checksum=verify_checksum,
reuse=reuse,
spacetoken=spacetoken,
bring_online=bring_online,
archive_timeout=archive_timeout,
dst_file_report=dst_file_report,
copy_pin_lifetime=copy_pin_lifetime,
job_metadata=metadata,
source_spacetoken=source_spacetoken,
overwrite=overwrite,
overwrite_on_retry=overwrite_on_retry,
multihop=multihop,
retry=retry,
retry_delay=retry_delay,
......@@ -164,7 +174,8 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
id_generator=id_generator,
sid=sid,
s3alternate=s3alternate,
nostreams=nostreams
nostreams=nostreams,
buffer_size=buffer_size
)
job = dict(
files=transfers,
......
......@@ -47,11 +47,13 @@ class Submitter(object):
del job['params']['checksum']
if 'filesize' in job['params']:
for f in job['files']:
f['filesize'] = job['params']['filesize']
if 'filesize' not in f:
f['filesize'] = job['params']['filesize']
del job['params']['filesize']
if 'file_metadata' in job['params']:
for f in job['files']:
f['metadata'] = job['params']['file_metadata']
if 'metadata' not in f:
f['metadata'] = job['params']['file_metadata']
del job['params']['file_metadata']
return json.dumps(job, indent=2)
......
......@@ -28,7 +28,7 @@ from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify
from fts3rest.lib import api
API_VERSION = dict(major=3, minor=10, patch=1)
API_VERSION = dict(major=3, minor=11, patch=0)
def _get_fts_core_version():
......
......@@ -65,14 +65,17 @@ def _populated_x509_name(components):
return x509_name
def _generate_proxy_request():
def _generate_proxy_request(key_len=2048):
"""
Generates a X509 proxy request.
Args:
key_len: Length of the RSA key in bits
Returns:
A tuple (X509 request, generated private key)
"""
key_pair = RSA.gen_key(1024, 65537, callback=_mute_callback)
key_pair = RSA.gen_key(key_len, 65537, callback=_mute_callback)
pkey = EVP.PKey()
pkey.assign_rsa(key_pair)
x509_request = X509.Request()
......@@ -282,8 +285,22 @@ class DelegationController(BaseController):
credential_cache = Session.query(CredentialCache)\
.get((user.delegation_id, user.user_dn))
if credential_cache is None or credential_cache.cert_request is None:
(x509_request, private_key) = _generate_proxy_request()
user_cert = self.certificate()
request_key_len = 2048
if user_cert:
user_key = X509.load_cert_string(user_cert)
request_key_len = user_key.get_pubkey().size() * 8
cached = credential_cache is not None and credential_cache.cert_request is not None
if cached:
cached_key_len = X509.load_request_string(credential_cache.cert_request).get_pubkey().size() * 8
if cached_key_len != request_key_len:
cached = False
log.debug("Invalidating cache due to key length missmatch between client and cached certificates")
if not cached:
(x509_request, private_key) = _generate_proxy_request(request_key_len)
credential_cache = CredentialCache(dlg_id=user.delegation_id, dn=user.user_dn,
cert_request=x509_request.as_pem(),
priv_key=private_key.as_pem(cipher=None),
......
......@@ -47,6 +47,8 @@ DEFAULT_PARAMS = {
'gridftp': '',
'job_metadata': None,
'overwrite': False,
'dst_file_report': False,
'overwrite_on_retry': False,
'reuse': None,
'multihop': False,
'source_spacetoken': '',
......@@ -292,6 +294,8 @@ def _seconds_from_value(value):
return int(value) * 60
elif suffix == 'h':
return int(value) * 3600
elif suffix == 'd':
return int(value) * 3600 * 24
else:
return None
except:
......@@ -446,6 +450,8 @@ class JobBuilder(object):
activity=file_dict.get('activity', 'default'),
hashed_id=shared_hashed_id if shared_hashed_id else _generate_hashed_id()
)
if f['file_metadata'] != None:
f['file_metadata'] = _metadata(f['file_metadata'])
self.files.append(f)
def _apply_selection_strategy(self):
......@@ -462,7 +468,7 @@ class JobBuilder(object):
"""
job_type = None
log.debug("job type is " + str(job_type)+ " reuse"+ str(self.params['reuse']))
log.debug("job_type=" + str(job_type) + " reuse=" + str(self.params['reuse']))
if self.params['multihop']:
job_type = 'H'
......@@ -471,7 +477,7 @@ class JobBuilder(object):
job_type = 'Y'
else:
job_type = 'N'
log.debug("job type is " + str(job_type))
log.debug("job_type=" + str(job_type))
self.is_bringonline = self.params['copy_pin_lifetime'] > 0 or self.params['bring_online'] > 0
self.is_qos_cdmi_transfer = (self.params['target_qos'] if 'target_qos' in self.params.keys() else None) is not None
......@@ -488,6 +494,22 @@ class JobBuilder(object):
if max_time_in_queue is not None:
expiration_time = time.time() + max_time_in_queue
if max_time_in_queue is not None and self.params['bring_online'] > 0:
# Ensure that the bringonline and expiration delta is respected
timeout_delta = _seconds_from_value(
pylons.config.get('fts3.BringOnlineAndExpirationDelta', None))
if timeout_delta is not None:
log.debug("Will enforce BringOnlineAndExpirationDelta=" + str(timeout_delta) + "s")
if max_time_in_queue - self.params['bring_online'] < timeout_delta:
raise HTTPBadRequest('Bringonline and Expiration timeout must be at least ' + str(timeout_delta) + ' seconds apart')
if self.params['overwrite']:
overwrite_flag = 'Y'
elif self.params['overwrite_on_retry']:
overwrite_flag = 'R'
else:
overwrite_flag = False
self.job = dict(
job_id=self.job_id,
job_state=job_initial_state,
......@@ -503,7 +525,8 @@ class JobBuilder(object):
submit_time=datetime.utcnow(),
priority=max(min(int(self.params['priority']), 5), 1),
space_token=self.params['spacetoken'],
overwrite_flag=_safe_flag(self.params['overwrite']),
dst_file_report=_safe_flag(self.params['dst_file_report']),
overwrite_flag=overwrite_flag,
source_space_token=self.params['source_spacetoken'],
copy_pin_lifetime=int(self.params['copy_pin_lifetime']),
checksum_method=self.params['verify_checksum'],
......@@ -582,13 +605,13 @@ class JobBuilder(object):
raise HTTPBadRequest('Reuse jobs can only contain transfers for the same source and destination storage')
if job_type == 'Y' and (self.job['source_se'] and self.job['dest_se']) and len(self.files) > min_reuse_files:
self.job['job_type'] == 'Y'
self.job['job_type'] = 'Y'
if job_type == 'N' and not self.is_multiple:
self.job['job_type'] = 'N'
auto_session_reuse= pylons.config.get('fts3.AutoSessionReuse', 'false')
log.debug("AutoSessionReuse is "+ str(auto_session_reuse) + " job_type is" + str(job_type))
log.debug("AutoSessionReuse=" + str(auto_session_reuse) + " job_type=" + str(job_type))
max_reuse_files = int(pylons.config.get('fts3.AutoSessionReuseMaxFiles', 1000))
max_size_small_file = int(pylons.config.get('fts3.AutoSessionReuseMaxSmallFileSize', 104857600)) #100MB
max_size_big_file = int(pylons.config.get('fts3.AutoSessionReuseMaxBigFileSize', 1073741824)) #1GB
......@@ -597,8 +620,8 @@ class JobBuilder(object):
if auto_session_reuse == 'true' and not self.is_multiple and not self.is_bringonline and len(self.files) > min_reuse_files:
if ((self.job['source_se']) and (self.job['dest_se']) and (job_type is None) and (len(self.files) > 1)):
if len(self.files) > max_reuse_files:
self.job['job_type'] == 'N'
log.debug("The number of files "+str(len(self.files))+"is bigger than the auto maximum reuse files "+str(max_reuse_files))
self.job['job_type'] = 'N'
log.debug("The number of files " + str(len(self.files)) + " is bigger than the auto maximum reuse files " + str(max_reuse_files))
else:
small_files = 0
big_files = 0
......@@ -612,7 +635,7 @@ class JobBuilder(object):
big_files +=1
if small_files > min_small_files and big_files <= max_big_files:
self.job['job_type'] = 'Y'
log.debug("Reuse jobs with "+str(small_files)+" small files up to "+str(len(self.files))+" total files")
log.debug("Reuse jobs with " + str(small_files) + " small files up to " + str(len(self.files)) + " total files")
# Need to reset their hashed_id so they land on the same machine
shared_hashed_id = _generate_hashed_id()
for file in self.files:
......@@ -641,6 +664,7 @@ class JobBuilder(object):
priority=3,
space_token=self.params['spacetoken'],
overwrite_flag='N',
dst_file_report='N',
source_space_token=self.params['source_spacetoken'],
copy_pin_lifetime=-1,
checksum_method=None,
......
......@@ -77,6 +77,9 @@ paramSchema = {
'overwrite': {
'type': ['boolean', 'null']
},
'dst_file_report': {
'type': ['boolean', 'null']
},
'gridftp': {
'type': ['string', 'null'],
'title': 'Reserved for future usage'
......
......@@ -520,21 +520,23 @@ class TestJobListing(TestController):
self.push_delegation()
job1 = self._submit(file_metadata='a')
job2 = self._submit(file_metadata='5')
job2 = self._submit(file_metadata={'key': 'value'})
job3 = self._submit(file_metadata='?')
job4 = self._submit(file_metadata={'key': 5})
jobs = self.app.get(
url="/jobs/%s?files=job_id,file_metadata" % ','.join([job1, job2, job3]),
url="/jobs/%s?files=job_id,file_metadata" % ','.join([job1, job2, job3, job4]),
status=200).json
self.assertEqual(3, len(jobs))
self.assertEqual(4, len(jobs))
self.assertEqual(jobs[0]['job_id'], jobs[0]['files'][0]['job_id'])
self.assertEqual('a', jobs[0]['files'][0]['file_metadata'])
self.assertEqual({'label': 'a'}, jobs[0]['files'][0]['file_metadata'])
self.assertEqual(jobs[1]['job_id'], jobs[1]['files'][0]['job_id'])
self.assertEqual('5', jobs[1]['files'][0]['file_metadata'])
self.assertEqual({"key": "value"}, jobs[1]['files'][0]['file_metadata'])
self.assertEqual(jobs[2]['job_id'], jobs[2]['files'][0]['job_id'])
self.assertEqual('?', jobs[2]['files'][0]['file_metadata'])
self.assertEqual({'label': '?'}, jobs[2]['files'][0]['file_metadata'])
self.assertEqual(jobs[3]['job_id'], jobs[3]['files'][0]['job_id'])
self.assertEqual({'key': 5}, jobs[3]['files'][0]['file_metadata'])
def test_query_something_running(self):
"""
Query if there are any active or submitted files for a given destination surl
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment