Commit ae0595c5 authored by Andrea Manzi's avatar Andrea Manzi
Browse files

Merge branch 'develop'

parents 753ff6ad 42b6c789
Pipeline #654144 passed with stage
in 1 minute and 18 seconds
......@@ -72,7 +72,10 @@ format is as follows:
: Do not validate the server certificate
--access-token
: Oauth2 access token (supported only by some endpoints, takes precedence)
: IAM-XDC access token (supported only by some endpoints, takes precedence)
--target-qos
: the target QoS that the destination has to have when the operation is finished
-b/--blocking
: Blocking mode. Wait until the operation completes.
......
......@@ -187,7 +187,9 @@ Where `config.json` is a json with the following parameters
"destination": "gsiftp://test2.cern.ch",
"nostreams": 16,
"tcp_buffer_size": 4096,
"optimizer_mode": 5 // integer in [1, 3]
"optimizer_mode": 3, // integer in [1, 3]
"min_active": 20,
"max_active": 100
}
```
......
......@@ -13,6 +13,12 @@ To connect to a MySQL database, you will need to install MySQL-python
yum install MySQL-python
```
To authenticate via OpenID Connect you will need to install fts-rest-oauth2
```
yum install fts-rest-oauth2
```
If you are installing in a different host, the steps are the same, but you will need to copy the FTS3 configuration file to the new host (since it is used by the rest front-end) and make sure you have installed the proper certificates under /etc/grid-security/certificates.
If you have enabled SELinux, for convenience you can install fts-rest-selinux, which contains the rules needed to have REST working (i.e. allow Apache to connect to the database, allow Apache to bind to 8446)
......
......@@ -2,7 +2,7 @@ FROM centos:7
MAINTAINER CERN
#add EPEL repos
ADD http://mirror.switch.ch/ftp/mirror/epel/epel-release-latest-7.noarch.rpm /tmp/epel-release-7.noarch.rpm
ADD http://linuxsoft.cern.ch/epel/epel-release-latest-7.noarch.rpm /tmp/epel-release-7.noarch.rpm
RUN yum localinstall /tmp/epel-release-7.noarch.rpm -y \
&& /usr/bin/yum --enablerepo=*-testing clean all \
&& rm /tmp/epel-release-7.noarch.rpm \
......
......@@ -2,7 +2,7 @@ FROM centos:7
MAINTAINER CERN
#add EPEL repos
ADD http://mirror.switch.ch/ftp/mirror/epel/epel-release-latest-7.noarch.rpm /tmp/epel-release-7.noarch.rpm
ADD http://linuxsoft.cern.ch/epel/epel-release-latest-7.noarch.rpm /tmp/epel-release-7.noarch.rpm
RUN yum localinstall /tmp/epel-release-7.noarch.rpm -y \
&& /usr/bin/yum --enablerepo=*-testing clean all \
&& rm /tmp/epel-release-7.noarch.rpm \
......
......@@ -4,7 +4,7 @@
%{!?nosetest_path: %global nosetest_path "/tmp"}
Name: fts-rest
Version: 3.8.1
Version: 3.8.2
Release: 1%{?dist}
BuildArch: noarch
Summary: FTS3 Rest Interface
......
......@@ -97,7 +97,7 @@ class Context(object):
self.x509 = self.x509_list[0]
not_after = self.x509.get_not_after()
try:
not_after = not_after.get_time()
not_after = not_after.get_datetime()
except:
# Ugly hack for Python 2.4
import time
......
......@@ -48,3 +48,18 @@ def get_job_status(context, job_id, list_files=False):
"""
inquirer = Inquirer(context)
return inquirer.get_job_status(job_id, list_files)
def get_jobs_statuses(context, job_ids, list_files=False):
"""
Get status for a list of jobs
Args:
context: fts3.rest.client.context.Context instance
job_ids: The job list
list_files: If True, the status of each individual file will be queried
Returns:
Decoded JSON message returned by the server (job status plus, optionally, list of files)
"""
inquirer = Inquirer(context)
return inquirer.get_jobs_statuses(job_ids, list_files)
......@@ -55,16 +55,17 @@ def cancel_all(context, vo_name=None):
return submitter.cancel_all(vo_name)
def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadata=None, activity=None):
def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadata=None, activity=None,selection_strategy='auto'):
"""
Creates a new transfer pair
Args:
source: Source SURL
destination: Destination SURL
checksum: Checksum
filesize: File size
metadata: Metadata to bind to the transfer
source: Source SURL
destination: Destination SURL
checksum: Checksum
filesize: File size
metadata: Metadata to bind to the transfer
selection_strategy: selection Strategy to implement for multiple replica Jobs
Returns:
An initialized transfer
......@@ -81,6 +82,9 @@ def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadat
transfer['metadata'] = metadata
if activity:
transfer['activity'] = activity
if selection_strategy:
transfer['selection_strategy'] = selection_strategy
return transfer
......@@ -104,7 +108,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
bring_online=None, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
max_time_in_queue=None, timeout=None,
id_generator=JobIdGenerator.standard, sid=None):
id_generator=JobIdGenerator.standard, sid=None, s3alternate=False):
"""
Creates a new dictionary representing a job
......@@ -125,6 +129,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
max_time_in_queue: Maximum number
id_generator: Job id generator algorithm
sid: Specific id given by the client
s3alternate: Use S3 alternate url schema
Returns:
An initialized dictionary representing a job
......@@ -154,7 +159,8 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
max_time_in_queue=max_time_in_queue,
timeout=timeout,
id_generator=id_generator,
sid=sid
sid=sid,
s3alternate=s3alternate
)
job = dict(
files=transfers,
......
......@@ -28,8 +28,14 @@ class BadEndpoint(FTS3ClientException):
class Unauthorized(FTS3ClientException):
def __init__(self, reason):
self.reason = reason
def __str__(self):
return "Unauthorized"
if self.reason:
return "Unauthorized: %s" % self.reason
else:
return "Unauthorized"
class ClientError(FTS3ClientException):
......
......@@ -30,11 +30,35 @@ class Inquirer(object):
self.context = context
def get_job_status(self, job_id, list_files=False):
if not isinstance(job_id, basestring):
raise Exception('The job_id provided is not a string!')
try:
job_info = json.loads(self.context.get("/jobs/%s" % job_id))
if list_files:
job_info['files'] = json.loads(self.context.get("/jobs/%s/files" % job_id))
job_info['dm'] = json.loads(self.context.get("/jobs/%s/dm" % job_id))
return job_info
except NotFound:
raise NotFound(job_id)
def get_jobs_statuses(self, job_ids, list_files=False):
if isinstance(job_ids, list):
xfer_ids = ','.join(job_ids)
else:
raise Exception('The input provided is not a list of ids!')
try:
if not list_files:
job_info = json.loads(self.context.get("/jobs/%s" % xfer_ids))
else:
job_info = json.loads(self.context.get("/jobs/%s?files=file_state,dest_surl,finish_time,start_time,reason,source_surl,file_metadata" % xfer_ids))
return job_info
except NotFound:
raise NotFound(job_id)
......
......@@ -65,7 +65,10 @@ class Request(object):
else:
raise ClientError('Bad request')
elif 401 <= code <= 403:
raise Unauthorized()
if message:
raise Unauthorized(message)
else:
raise Unauthorized()
elif code == 404:
raise NotFound(url, message)
elif code == 419:
......
......@@ -28,6 +28,10 @@ Listen 8446
SSLCACertificatePath /etc/grid-security/certificates
SSLCARevocationPath /etc/grid-security/certificates
<IfVersion >= 2.4>
SSLCARevocationCheck chain
</IfVersion>
# Require client certificate
SSLVerifyClient optional
SSLVerifyDepth 10
......
......@@ -28,7 +28,7 @@ from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify
from fts3rest.lib import api
API_VERSION = dict(major=3, minor=8, patch=1)
API_VERSION = dict(major=3, minor=8, patch=2)
def _get_fts_core_version():
......
......@@ -282,9 +282,8 @@ class BanningController(BaseController):
if not storage:
raise HTTPBadRequest('Missing storage parameter')
vo_name = input_dict.get('vo_name', '*')
if vo_name is None or vo_name == '':
raise HTTPBadRequest('vo_name can not be null')
user = request.environ['fts3.User.Credentials']
vo_name = user.vos[0]
allow_submit = bool(input_dict.get('allow_submit', False))
status = input_dict.get('status', 'cancel').upper()
......@@ -355,8 +354,10 @@ class BanningController(BaseController):
raise HTTPBadRequest('Missing storage parameter')
job_ids = []
try:
Session.query(BannedSE).filter(BannedSE.se==storage).delete()
job_ids= _reenter_queue(storage, '*')
user = request.environ['fts3.User.Credentials']
vo_name = user.vos[0]
Session.query(BannedSE).filter(BannedSE.se==storage,BannedSE.vo==vo_name).delete()
job_ids= _reenter_queue(storage, vo_name)
Session.commit()
except Exception:
Session.rollback()
......
......@@ -53,22 +53,65 @@ function refreshLinks()
});
});
tr.append($("<td></td>").append(deleteBtn))
.append($("<td id='symname'></td>").text(link.symbolicname))
.append($("<td id='srcname'></td>").text(link.source))
.append($("<td></td>").text(link.destination))
.append($("<td></td>").text(link.nostreams))
.append($("<td></td>").text(link.min_active))
.append($("<td></td>").text(link.max_active))
.append($("<td></td>").text(link.optimizer_mode))
.append($("<td></td>").text(link.tcp_buffer_size))
var updateBtn = $("<button class='btn btn-link' type='button'></button>")
.append("<i class='glyphicon glyphicon-floppy-disk'></i>");
tbody.append(tr);
tr.append($("<td></td>").append(deleteBtn).append(updateBtn))
.append($("<td></td>")
.append($("<input type='text' name='symbolicname_"+link.symbolicname+"' class='form-control'/>").val(link.symbolicname)))
.append($("<td></td>")
.append($("<input type='text' name='source_"+link.symbolicname+"' class='form-control'/>").val(link.source)))
.append($("<td></td>")
.append($("<input type='text' name='destination_"+link.symbolicname+"' class='form-control'/>").val(link.destination)))
.append($("<td></td>")
.append($("<input type='number' name='nostreams_"+link.symbolicname+"' min='0' max='16' class='form-control'/>").val(link.nostreams)))
.append($("<td></td>")
.append($("<input type='number' name='min_active_"+link.symbolicname+"' class='form-control'/>").val(link.min_active)))
.append($("<td></td>")
.append($("<input type='number' name='max_active_"+link.symbolicname+"' class='form-control'/>").val(link.max_active)))
.append($("<td></td>")
.append($("<input type='number' name='optimizer_mode_"+link.symbolicname+"' min='0' max='3' class='form-control'/>").val(link.optimizer_mode)))
.append($("<td></td>")
.append($("<input type='number' name='tcp_buffer_size_"+link.symbolicname+"' class='form-control'/>").val(link.tcp_buffer_size)));
tbody.append(tr);
updateBtn.click(function() {
// tr.css("background", "#3CB371");
var saveload = {
symbolicname: tr.find("input[name='symbolicname_"+link.symbolicname+"']").val(),
source: tr.find("input[name='source_"+link.symbolicname+"']").val(),
destination: tr.find("input[name='destination_"+link.symbolicname+"']").val(),
nostreams: tr.find("input[name='nostreams_"+link.symbolicname+"']").val(),
min_active: tr.find("input[name='min_active_"+link.symbolicname+"']").val(),
max_active: tr.find("input[name='max_active_"+link.symbolicname+"']").val(),
optimizer_mode: tr.find("input[name='optimizer_mode_"+link.symbolicname+"']").val(),
tcp_buffer_size: tr.find("input[name='tcp_buffer_size_"+link.symbolicname+"']").val()
};
console.log(saveload);
$.ajax({
url: "/config/links",
type: "POST",
dataType: "json",
contentType: "application/json",
data: JSON.stringify(saveload)
})
.done(function() {
tr.find("input").prop("disabled", false)
tr.css("background", "#3CB371");
refreshLinks();
})
.fail(function(jqXHR) {
errorMessage(jqXHR);
tr.css("background", "#ffffff");
});
});
});
})
.fail(function(jqXHR) {
errorMessage(jqXHR);
});
}
......
......@@ -45,7 +45,7 @@
</td>
<td>
<input class="form-control" type="number" placeholder="Streams" name="nostreams"
min="0" max="100" value="1"/>
min="0" max="16" value="1"/>
</td>
<td>
<input class="form-control" type="number" placeholder="Min Actives" name="min_active"
......@@ -57,7 +57,7 @@
</td>
<td>
<input class="form-control" type="number" placeholder="Optimizer Mode" name="optimizer_mode"
min="0" max="5" value="0"/>
min="0" max="3" value="0"/>
</td>
<td>
<input class="form-control" type="number" placeholder="TCP buffer size" name="tcp_buffer_size"
......
......@@ -100,13 +100,13 @@ class TestBanning(TestController):
"""
jobs = list()
jobs.append(
insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED', user_dn='/DC=cern/CN=someone')
insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED', user_dn='/DC=cern/CN=someone')
)
jobs.append(
insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE', user_dn='/DC=cern/CN=someone')
insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE', user_dn='/DC=cern/CN=someone')
)
jobs.append(
insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20,
insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20,
user_dn='/DC=cern/CN=someone')
)
......@@ -148,16 +148,13 @@ class TestBanning(TestController):
status=200
).json
self.assertEqual(0, len(canceled))
banned = Session.query(BannedSE).get(('gsiftp://nowhere', '*'))
banned = Session.query(BannedSE).filter(BannedSE.se=='gsiftp://nowhere').first()
self.assertNotEqual(None, banned)
self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)
self.assertEqual('CANCEL', banned.status)
self.assertEqual('*', banned.vo)
self.assertEqual('TEST BAN 42', banned.message)
self.app.delete(url="/ban/se?storage=%s" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get(('gsiftp://nowhere', '*'))
banned = Session.query(BannedSE).filter(BannedSE.se=='gsiftp://nowhere').first()
self.assertEqual(None, banned)
def test_list_banned_ses(self):
......@@ -185,19 +182,19 @@ class TestBanning(TestController):
"""
canceled = self.app.post(
url="/ban/se",
params={'storage': 'gsiftp://nowhere', 'vo_name': 'dteam'},
params={'storage': 'gsiftp://nowhere', 'vo_name': 'testvo'},
status=200
).json
self.assertEqual(0, len(canceled))
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'dteam'))
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'testvo'))
self.assertNotEqual(None, banned)
self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)
self.assertEqual('CANCEL', banned.status)
self.assertEqual('dteam', banned.vo)
self.assertEqual('testvo', banned.vo)
self.app.delete(url="/ban/se?storage=%s&vo_name=dteam" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'dteam'))
self.app.delete(url="/ban/se?storage=%s&vo_name=testvo" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'someone'))
self.assertEqual(None, banned)
def test_ban_se_cancel(self):
......@@ -205,9 +202,9 @@ class TestBanning(TestController):
Ban a SE that has files queued, make sure they are canceled
"""
jobs = list()
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE'))
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20))
canceled_ids = self.app.post(
url="/ban/se",
......@@ -241,7 +238,7 @@ class TestBanning(TestController):
Ban a SE that has files queued. If a job has other pairs, the job must remain!
"""
job_id = insert_job(
'dteam',
'testvo',
multiple=[('gsiftp://source', 'gsiftp://destination'), ('gsiftp://other', 'gsiftp://destination')]
)
canceled_ids = self.app.post(
......@@ -270,13 +267,13 @@ class TestBanning(TestController):
Cancel a SE that has files queued, make sure they are canceled (with VO)
"""
jobs = list()
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('atlas', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('atlas', 'gsiftp://source', 'gsiftp://destination2', 'SUBMITTED'))
canceled_ids = self.app.post(
url="/ban/se",
params={'storage': 'gsiftp://source', 'status': 'cancel', 'vo_name': 'dteam'},
params={'storage': 'gsiftp://source', 'status': 'cancel', 'vo_name': 'testvo'},
status=200
).json
......@@ -302,9 +299,9 @@ class TestBanning(TestController):
Ban a SE, but instead of canceling, give jobs some time to finish
"""
jobs = list()
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE'))
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'ACTIVE'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination2', 'FAILED', duration=10, queued=20))
waiting_ids = self.app.post(
url="/ban/se",
......@@ -332,7 +329,7 @@ class TestBanning(TestController):
for f in files:
self.assertEqual('FAILED', f.file_state)
banned = Session.query(BannedSE).get(('gsiftp://source', '*'))
banned = Session.query(BannedSE).get(('gsiftp://source', 'testvo'))
self.assertEqual('WAIT', banned.status)
def test_ban_se_wait_vo(self):
......@@ -340,13 +337,13 @@ class TestBanning(TestController):
Ban a SE, but instead of canceling, give jobs some time to finish (with VO)
"""
jobs = list()
jobs.append(insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('atlas', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED'))
jobs.append(insert_job('atlas', 'gsiftp://source', 'gsiftp://destination2', 'SUBMITTED'))
waiting_ids = self.app.post(
url="/ban/se",
params={'storage': 'gsiftp://source', 'status': 'wait', 'vo_name': 'dteam', 'timeout': 33},
params={'storage': 'gsiftp://source', 'status': 'wait', 'vo_name': 'testvo', 'timeout': 33},
status=200
).json
......@@ -440,7 +437,7 @@ class TestBanning(TestController):
Regression for FTS-297
When unbanning a storage, if any file was left on wait, they must re-enter the queue
"""
job_id = insert_job('dteam', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED', user_dn='/DC=cern/CN=someone')
job_id = insert_job('testvo', 'gsiftp://source', 'gsiftp://destination', 'SUBMITTED', user_dn='/DC=cern/CN=someone')
self.app.post(
url="/ban/se", params={'storage': 'gsiftp://source', 'status': 'wait', 'allow_submit': True},
status=200
......@@ -506,7 +503,7 @@ class TestBanning(TestController):
"""
self.push_delegation()
pre_job_id = insert_job('dteam', 'srm://source', 'srm://destination', 'STAGING', user_dn='/DC=cern/CN=someone')
pre_job_id = insert_job('testvo', 'srm://source', 'srm://destination', 'STAGING', user_dn='/DC=cern/CN=someone')
self.app.post(
url="/ban/se", params={'storage': 'srm://source', 'status': 'wait', 'allow_submit': True},
status=200
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment