Commit 0fec1f4a authored by Andrea Manzi's avatar Andrea Manzi
Browse files

Merge branch 'develop'

parents ae7bf9eb 12de8442
Pipeline #1241282 passed with stage
in 1 minute and 57 seconds
......@@ -4,7 +4,7 @@
%{!?nosetest_path: %global nosetest_path "/tmp"}
Name: fts-rest
Version: 3.9.2
Version: 3.9.3
Release: 1%{?dist}
BuildArch: noarch
Summary: FTS3 Rest Interface
......
......@@ -64,7 +64,7 @@ base_dir = os.path.dirname(__file__)
setup(
name='fts3-rest',
version='3.9.2',
version='3.9.3',
description='FTS3 Python Libraries',
long_description='FTS3 Python Libraries',
author='FTS3 Developers',
......
......@@ -28,7 +28,7 @@ from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify
from fts3rest.lib import api
API_VERSION = dict(major=3, minor=9, patch=2)
API_VERSION = dict(major=3, minor=9, patch=3)
def _get_fts_core_version():
......
......@@ -33,8 +33,8 @@ class AutocompleteController(BaseController):
@jsonify
def autocomplete_dn(self):
"""
Autocomplete for users' dn
"""
Autocomplete for users' dn
"""
term = request.params.get('term', '/DC=cern.ch')
matches = Session.query(Credential.dn).filter(Credential.dn.startswith(term)).distinct().all()
return map(lambda r: r[0], matches)
......
......@@ -190,7 +190,7 @@ def _set_to_wait_helper(storage, vo_name, from_state, to_state):
file_ids = Session.query(File.file_id).filter(
and_(
File.file_state == from_state),
(File.source_se == storage) | (File.dest_se == storage)
(File.source_se == storage) | (File.dest_se == storage)
)
if vo_name and vo_name != '*':
file_ids = file_ids.filter(File.vo_name == vo_name)
......@@ -283,7 +283,7 @@ class BanningController(BaseController):
raise HTTPBadRequest('Missing storage parameter')
user = request.environ['fts3.User.Credentials']
vo_name = user.vos[0]
vo_name = user.vos[0]
allow_submit = bool(input_dict.get('allow_submit', False))
status = input_dict.get('status', 'cancel').upper()
......
......@@ -57,12 +57,12 @@ class LinkConfigController(BaseController):
source = input_dict.get('source', '*')
destination = input_dict.get('destination', '*')
symbolicname = input_dict.get('symbolicname', None)
if not symbolicname:
raise HTTPBadRequest('Missing symbolicname')
link_cfg = Session.query(LinkConfig).filter(LinkConfig.symbolicname == symbolicname).first()
try:
min_active = int(input_dict.get('min_active', 2))
max_active = int(input_dict.get('max_active', 2))
......@@ -78,8 +78,8 @@ class LinkConfigController(BaseController):
raise HTTPBadRequest('Missing max_active')
if min_active > max_active:
raise HTTPBadRequest('max_active is lower than min_active')
if not link_cfg:
link_cfg = LinkConfig(
source=source,
......@@ -88,10 +88,10 @@ class LinkConfigController(BaseController):
min_active = min_active,
max_active = max_active
)
for key, value in input_dict.iteritems():
#value = validate_type(LinkConfig, key, value)
setattr(link_cfg, key, value)
......
......@@ -54,36 +54,38 @@ class SeConfigurationController(BaseController):
input_dict = get_input_as_dict(request)
try:
for storage, cfg in input_dict.iteritems():
se_info = None
se_info_new = cfg.get('se_info', None)
if se_info_new:
se_info = Session.query(Se).get(storage)
if not se_info:
se_info = Se(storage=storage)
for key, value in se_info_new.iteritems():
#value = validate_type(Se, key, value)
setattr(se_info, key, value)
audit_configuration('set-se-config', 'Set config %s: %s' % (storage, json.dumps(cfg)))
Session.merge(se_info)
# Operation limits
operations = cfg.get('operations', None)
if operations:
for vo, limits in operations.iteritems():
for op, limit in limits.iteritems():
limit = int(limit)
new_limit = Session.query(OperationConfig).get((vo, storage, op))
if limit > 0:
if not new_limit:
new_limit = OperationConfig(
vo_name=vo, host=storage, operation=op
)
new_limit.concurrent_ops = limit
Session.merge(new_limit)
elif new_limit:
Session.delete(new_limit)
audit_configuration('set-se-limits', 'Set limits for %s: %s' % (storage, json.dumps(operations)))
if not storage or storage.isspace():
raise ValueError
se_info = None
se_info_new = cfg.get('se_info', None)
if se_info_new:
se_info = Session.query(Se).get(storage)
if not se_info:
se_info = Se(storage=storage)
for key, value in se_info_new.iteritems():
#value = validate_type(Se, key, value)
setattr(se_info, key, value)
audit_configuration('set-se-config', 'Set config %s: %s' % (storage, json.dumps(cfg)))
Session.merge(se_info)
# Operation limits
operations = cfg.get('operations', None)
if operations:
for vo, limits in operations.iteritems():
for op, limit in limits.iteritems():
limit = int(limit)
new_limit = Session.query(OperationConfig).get((vo, storage, op))
if limit > 0:
if not new_limit:
new_limit = OperationConfig(
vo_name=vo, host=storage, operation=op
)
new_limit.concurrent_ops = limit
Session.merge(new_limit)
elif new_limit:
Session.delete(new_limit)
audit_configuration('set-se-limits', 'Set limits for %s: %s' % (storage, json.dumps(operations)))
Session.commit()
except (AttributeError, ValueError):
Session.rollback()
......
......@@ -405,4 +405,4 @@ class DatamanagementController(BaseController):
except KeyError, e:
raise HTTPBadRequest('Missing parameter: %s' % str(e))
finally:
os.unlink(proxy.name)
\ No newline at end of file
os.unlink(proxy.name)
......@@ -421,9 +421,9 @@ class JobsController(BaseController):
Session.query(File).filter(File.job_id == job.job_id)\
.filter(File.file_state.in_(FileActiveStates), File.pid == None) \
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': now
}, synchronize_session=False)
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': now
}, synchronize_session=False)
# However, for data management operations there is nothing to signal, so
# set job_finished
Session.query(DataManagement).filter(DataManagement.job_id == job.job_id)\
......@@ -494,10 +494,10 @@ class JobsController(BaseController):
try:
for job in modifiable_jobs:
if priority:
for file in job.files:
file.priority = priority
file = Session.merge(file)
log.info("File from Job %s priority changed to %d" % (job.job_id, priority))
for file in job.files:
file.priority = priority
file = Session.merge(file)
log.info("File from Job %s priority changed to %d" % (job.job_id, priority))
job.priority = priority
job = Session.merge(job)
log.info("Job %s priority changed to %d" % (job.job_id, priority))
......@@ -565,9 +565,9 @@ class JobsController(BaseController):
Session.execute(DataManagement.__table__.insert(), populated.datamanagement)
Session.flush()
Session.commit()
except IntegrityError as err:
Session.rollback()
raise HTTPConflict('The submission is duplicated '+ str(err))
except IntegrityError as err:
Session.rollback()
raise HTTPConflict('The submission is duplicated '+ str(err))
except:
Session.rollback()
raise
......@@ -631,7 +631,7 @@ class JobsController(BaseController):
.update({
'job_state': 'CANCELED', 'reason': 'Job canceled by the user',
'job_finished': now
}, synchronize_session=False)
}, synchronize_session=False)
Session.commit()
Session.expire_all()
log.info("Active jobs for VO %s canceled" % vo_name)
......
......@@ -341,8 +341,8 @@ class Oauth2Controller(BaseController):
authorized = self.oauth2_provider.is_already_authorized(user.delegation_id, app.client_id, auth['state']['scope'])
if authorized:
response = self.oauth2_provider.get_authorization_code(
auth['response_type'], auth['client_id'],
auth['redirect_uri'], **auth['state']
auth['response_type'], auth['client_id'],
auth['redirect_uri'], **auth['state']
)
for k, v in response.headers.iteritems():
pylons.response.headers[str(k)] = str(v)
......
......@@ -59,7 +59,7 @@ class OptimizerController(BaseController):
evolution = evolution.order_by(OptimizerEvolution.datetime.desc())
return evolution[:50]
@doc.return_type(array_of=Optimizer)
@jsonify
def get_optimizer_values(self):
......@@ -75,7 +75,7 @@ class OptimizerController(BaseController):
optimizer = optimizer.order_by(Optimizer.datetime.desc())
return optimizer
@doc.response(400, 'Invalid values passed in the request')
@jsonify
def set_optimizer_values(self):
......@@ -86,25 +86,25 @@ class OptimizerController(BaseController):
source_se = input_dict.get('source_se', None)
dest_se = input_dict.get('dest_se', None)
rationale = input_dict.get('rationale', None)
current_time = datetime.utcnow()
if not source_se or not dest_se:
raise HTTPBadRequest('Missing source and/or destination')
try:
active = int(input_dict.get('active', 2))
except Exception, e:
raise HTTPBadRequest('Active must be an integer (%s)' % str(e))
if active < 0:
raise HTTPBadRequest('Active must be positive (%s)' % str(active))
try:
nostreams = int(input_dict.get('nostreams', 1))
except Exception, e:
raise HTTPBadRequest('Nostreams must be an integer (%s)' % str(e))
if nostreams < 0:
raise HTTPBadRequest('Nostreams must be positive (%s)' % str(nostreams))
try:
diff = int(input_dict.get('diff', 1))
except Exception, e:
......@@ -153,7 +153,7 @@ class OptimizerController(BaseController):
raise HTTPBadRequest('Filesize_stddev must be a float (%s)' % str(e))
if filesize_stddev < 0:
raise HTTPBadRequest('Filesize_stddev must be positive (%s)' % str(filesize_stddev))
optimizer = Optimizer(
source_se=source_se,
dest_se=dest_se,
......@@ -177,8 +177,8 @@ class OptimizerController(BaseController):
filesize_avg=filesize_avg,
filesize_stddev=filesize_stddev
)
for key, value in input_dict.iteritems():
setattr(evolution, key, value)
......@@ -194,4 +194,4 @@ class OptimizerController(BaseController):
raise
return (evolution, optimizer)
......@@ -62,7 +62,7 @@ def get_base_id():
def get_vo_id(vo_name):
log.debug("VO name: "+vo_name)
return uuid.uuid5(BASE_ID, vo_name.encode('utf-8'))
def get_storage_element(uri):
"""
Returns the storage element of the given uri, which is the scheme +
......@@ -82,9 +82,9 @@ def _is_dest_surl_uuid_enabled(vo_name):
"""
list_of_vos = pylons.config.get('fts3.CheckDuplicates', 'None')
if not list_of_vos:
return False
return False
if vo_name in list_of_vos or "*" in list_of_vos:
return True
return True
return False
def _validate_url(url):
......@@ -163,47 +163,47 @@ def _select_best_replica(files, vo_name, entry_state, strategy):
elif strategy == "queue" or strategy == "auto":
sorted_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
dst,
vo_name))
dst,
vo_name))
elif strategy == "success":
sorted_ses = map(lambda x: x[0], s.rank_success_rate(source_se_list,
dst))
dst))
elif strategy == "throughput":
sorted_ses = map(lambda x: x[0], s.rank_throughput(source_se_list,
dst))
dst))
elif strategy == "file-throughput":
sorted_ses = map(lambda x: x[0], s.rank_per_file_throughput(
source_se_list,
dst))
source_se_list,
dst))
elif strategy == "pending-data":
sorted_ses = map(lambda x: x[0], s.rank_pending_data(source_se_list,
dst,
vo_name,
activity))
dst,
vo_name,
activity))
elif strategy == "waiting-time":
sorted_ses = map(lambda x: x[0], s.rank_waiting_time(source_se_list,
dst,
vo_name,
activity))
dst,
vo_name,
activity))
elif strategy == "waiting-time-with-error":
sorted_ses = map(lambda x: x[0], s.rank_waiting_time_with_error(
source_se_list,
dst,
vo_name,
activity))
source_se_list,
dst,
vo_name,
activity))
elif strategy == "duration":
sorted_ses = map(lambda x: x[0], s.rank_finish_time(source_se_list,
dst,
vo_name,
activity,
user_filesize))
dst,
vo_name,
activity,
user_filesize))
else:
raise HTTPBadRequest(strategy + " algorithm is not supported by Scheduler")
......@@ -216,10 +216,10 @@ def _select_best_replica(files, vo_name, entry_state, strategy):
if transfer['source_se'] == best_se:
best_index = index
break
files[best_index]['file_state'] = entry_state
if _is_dest_surl_uuid_enabled(vo_name):
files[best_index]['dest_surl_uuid'] = str(uuid.uuid5(BASE_ID, files[best_index]['dest_surl'].encode('utf-8')))
files[best_index]['dest_surl_uuid'] = str(uuid.uuid5(BASE_ID, files[best_index]['dest_surl'].encode('utf-8')))
def _apply_banning(files):
......@@ -352,6 +352,20 @@ class JobBuilder(object):
if elem['dest_se'] != self.job['dest_se']:
self.job['dest_se'] = None
def _set_activity_query_string(self, url, file_dict):
"""
Set the activity query string in the given url
"""
query_p = parse_qsl(url.query)
query_p.append(('activity', file_dict.get('activity', 'default')))
query_str = urlencode(query_p)
return ParseResult(scheme=url.scheme,
netloc=url.netloc,
path=url.path,
params=url.params,
query= query_str,
fragment=url.fragment)
def _populate_files(self, file_dict, f_index, shared_hashed_id):
"""
From the dictionary file_dict, generate a list of transfers for a job
......@@ -383,36 +397,31 @@ class JobBuilder(object):
# Multiple replicas, all must share the hashed-id
if shared_hashed_id is None:
shared_hashed_id = _generate_hashed_id()
vo_name = self.user.vos[0]
vo_name = self.user.vos[0]
for source,destination in pairs:
if len(file_dict['sources']) > 1 or not _is_dest_surl_uuid_enabled(vo_name):
dest_uuid = None
else:
dest_uuid = str(uuid.uuid5(BASE_ID, destination.geturl().encode('utf-8')))
if len(file_dict['sources']) > 1 or not _is_dest_surl_uuid_enabled(vo_name):
dest_uuid = None
else:
dest_uuid = str(uuid.uuid5(BASE_ID, destination.geturl().encode('utf-8')))
if self.is_bringonline:
# add the new query parameter only for root -> EOS-CTA for now
if source.scheme == "root":
query_p = parse_qsl(source.query)
query_p.append(('activity', file_dict.get('activity', 'default')))
query_str = urlencode(query_p)
source = ParseResult(scheme=source.scheme,
netloc=source.netloc,
path=source.path,
params=source.params,
query= query_str,
fragment=source.fragment)
if source == destination:
destination = self._set_activity_query_string(destination,file_dict)
source = self._set_activity_query_string(source,file_dict)
f = dict(
job_id=self.job_id,
file_index=f_index,
dest_surl_uuid=dest_uuid,
dest_surl_uuid=dest_uuid,
file_state=initial_file_state,
source_surl=source.geturl(),
dest_surl=destination.geturl(),
source_se=get_storage_element(source),
dest_se=get_storage_element(destination),
vo_name=None,
priority=self.job['priority'],
priority=self.job['priority'],
user_filesize=_safe_filesize(file_dict.get('filesize', 0)),
selection_strategy=file_dict.get('selection_strategy', 'auto'),
checksum=file_dict.get('checksum', None),
......@@ -434,10 +443,10 @@ class JobBuilder(object):
"""
Initializes the list of transfers
"""
job_type = None
log.debug("job type is " + str(job_type)+ " reuse"+ str(self.params['reuse']))
if self.params['multihop']:
job_type = 'H'
elif self.params['reuse'] is not None:
......@@ -508,7 +517,7 @@ class JobBuilder(object):
has_checksum = len(file_dict['checksum']) > 0
else:
file_dict['checksum'] = 'ADLER32'
if type(self.job['checksum_method']) == bool:
if not self.job['checksum_method'] and has_checksum:
self.job['checksum_method'] = 'target'
......@@ -517,7 +526,7 @@ class JobBuilder(object):
self.job['checksum_method'] = 'none'
else:
self.job['checksum_method'] = 'both'
self.job['checksum_method'] = self.job['checksum_method'][0]
# Validate that if this is a multiple replica job, that there is one single unique file
self.is_multiple, unique_files = _has_multiple_options(self.files)
......@@ -539,19 +548,19 @@ class JobBuilder(object):
self.files[0]['file_state'] = 'SUBMITTED'
self._set_job_source_and_destination(self.files)
# If reuse is enabled, source and destination SE must be the same for all entries
# Ignore for multiple replica jobs!
min_reuse_files = int(pylons.config.get('fts3.SessionReuseMinFiles', 5))
if job_type == 'Y' and (not self.job['source_se'] or not self.job['dest_se']):
raise HTTPBadRequest('Reuse jobs can only contain transfers for the same source and destination storage')
if job_type == 'Y' and (self.job['source_se'] and self.job['dest_se']) and len(self.files) > min_reuse_files:
self.job['job_type'] == 'Y'
if job_type == 'N' and not self.is_multiple:
self.job['job_type'] = 'N'
auto_session_reuse= pylons.config.get('fts3.AutoSessionReuse', 'false')
log.debug("AutoSessionReuse is "+str(auto_session_reuse)+ " job_type is" + str(job_type))
max_reuse_files = int(pylons.config.get('fts3.AutoSessionReuseMaxFiles', 1000))
......@@ -582,7 +591,7 @@ class JobBuilder(object):
shared_hashed_id = _generate_hashed_id()
for file in self.files:
file['hashed_id'] = shared_hashed_id
if self.job['job_type'] is None:
self.job['job_type'] = 'N'
......
......@@ -31,7 +31,7 @@ fileSchema = {
'type': ['integer', 'null'],
'title': 'Job priority'
},
'metadata': {'type': ['object', 'null']},
'metadata': {'type': ['object', 'null']},
'filesize': {'type': ['integer', 'null'], 'minimum': 0},
'checksum': {
'type': ['string', 'null'],
......
......@@ -34,12 +34,12 @@ def submit_state_change(job, transfer, transfer_state):
msg_enabled = pylons.config.get('fts3.MonitoringMessaging', False)
if not msg_enabled or msg_enabled.lower() == 'false':
return
publish_dn = pylons.config.get('fts3.MonitoringPublishDN', False)
msg_dir = pylons.config.get('fts3.MessagingDirectory', '/var/lib/fts3')
mon_dir = os.path.join(msg_dir, 'monitoring')
_user_dn = job['user_dn'] if publish_dn else ''
msg = dict(
......
......@@ -58,12 +58,12 @@ def _get_proxy_fqans(proxy_path):
raise VomsException('Failed to get the FQANs of a proxy: ' + out)
fqans = []
try:
for fqan in out.split('\n'):
fqans.append(fqan)
for fqan in out.split('\n'):
fqans.append(fqan)
return fqans
except Exception, e:
raise VomsException('Failed to get the FQANs of a proxy: ' + str(e))
def _get_proxy_termination_time(proxy_path):
......@@ -151,14 +151,14 @@ class VomsClient(object):
"""
new_proxy = NamedTemporaryFile(mode='w', suffix='.pem', delete=False).name
args = ['voms-proxy-init',
'--cert', self.proxy_path,
'--key', self.proxy_path,
'--out', new_proxy,
'--noregen', '--ignorewarn']