Commit 6d9c9935 authored by Andrea Manzi's avatar Andrea Manzi
Browse files

fix merge

parents 395b9c28 47f32bf6
Pipeline #847674 passed with stage
in 1 minute and 20 seconds
......@@ -4,8 +4,8 @@
%{!?nosetest_path: %global nosetest_path "/tmp"}
Name: fts-rest
Version: 3.8.4
Release: 2%{?dist}
Version: 3.9.0
Release: 1%{?dist}
BuildArch: noarch
Summary: FTS3 Rest Interface
Group: Applications/Internet
......@@ -278,6 +278,7 @@ EOF
%config(noreplace) %{_sysconfdir}/fts3/fts3rest.ini
%config(noreplace) %{_sysconfdir}/httpd/conf.d/fts3rest.conf
%config(noreplace) %{_sysconfdir}/logrotate.d/fts-rest
%config(noreplace) %{_sysconfdir}/cron.d/fts-rest-graceful.cron
%dir %attr(0755,fts3,fts3) %{_var}/cache/fts3rest
%dir %attr(0755,fts3,fts3) %{_var}/log/fts3rest
%doc docs/README.md
......
......@@ -16,7 +16,7 @@
# limitations under the License.
from sqlalchemy import BigInteger
from sqlalchemy import Boolean, Column, DateTime, Float
from sqlalchemy import ForeignKey, Integer, String, Enum
from sqlalchemy import ForeignKey, Integer, String, Enum, UniqueConstraint
from sqlalchemy.dialects import sqlite
from sqlalchemy.orm import relation, backref
......@@ -39,9 +39,11 @@ class File(Base):
hashed_id = Column(Integer)
file_index = Column(Integer)
job_id = Column(String(36), ForeignKey('t_job.job_id'))
dest_surl_uuid = Column(String(255), unique=True)
vo_name = Column(String(50))
source_se = Column(String(255))
dest_se = Column(String(255))
priority = Column(Integer)
file_state = Column(Enum(*(FileActiveStates + FileTerminalStates + FileOnHoldStates)))
transfer_host = Column(String(255))
source_surl = Column(String(1100))
......@@ -85,8 +87,10 @@ class ArchivedFile(Base):
file_index = Column(Integer)
job_id = Column(String(36),
ForeignKey('t_job_backup.job_id'))
dest_surl_uuid = Column(String(255), unique=True)
source_se = Column(String(255))
dest_se = Column(String(255))
priority = Column(Integer)
file_state = Column(String(32))
transferhost = Column(String(255))
source_surl = Column(String(1100))
......
......@@ -29,3 +29,7 @@ install (FILES fts3rest.conf
install (FILES fts3rest.xml
DESTINATION usr/lib/firewalld/services/
)
# cron
install (FILES fts-rest-graceful.cron
DESTINATION etc/cron.d
)
12 */6 * * * root /usr/sbin/apachectl graceful >& /dev/null
......@@ -7,6 +7,8 @@ Infosys=lcg-bdii.cern.ch:2170
AuthorizedVO=dteam;*
CleanRecordsHost=false
CheckDuplicates=dteam;*
MessagingDirectory=/tmp/rest-msgs
HeartBeatInterval=0
......
......@@ -28,7 +28,7 @@ from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify
from fts3rest.lib import api
API_VERSION = dict(major=3, minor=8, patch=4)
API_VERSION = dict(major=3, minor=9, patch=0)
def _get_fts_core_version():
......
......@@ -102,7 +102,7 @@ def _cancel_transfers(storage=None, vo_name=None):
Session.query(File).filter(File.file_id == file_id)\
.update({
'file_state': 'CANCELED', 'reason': 'Storage banned',
'finish_time': now
'finish_time': now, 'dest_surl_uuid': None
}, synchronize_session=False)
# If there are alternatives, enable them
if Session.bind.dialect.name == 'mysql':
......
......@@ -315,6 +315,7 @@ class JobsController(BaseController):
file.file_state = 'CANCELED'
file.finish_time = datetime.utcnow()
file.dest_surl_uuid = None
changed_states.append('CANCELED')
Session.merge(file)
......@@ -414,13 +415,13 @@ class JobsController(BaseController):
Session.query(File).filter(File.job_id == job.job_id)\
.filter(File.file_state.in_(FileActiveStates), File.pid != None)\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': None
}, synchronize_session=False)
Session.query(File).filter(File.job_id == job.job_id)\
.filter(File.file_state.in_(FileActiveStates), File.pid == None) \
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': now
}, synchronize_session=False)
# However, for data management operations there is nothing to signal, so
......@@ -493,6 +494,10 @@ class JobsController(BaseController):
try:
for job in modifiable_jobs:
if priority:
for file in job.files:
file.priority = priority
file = Session.merge(file)
log.info("File from Job %s priority changed to %d" % (job.job_id, priority))
job.priority = priority
job = Session.merge(job)
log.info("Job %s priority changed to %d" % (job.job_id, priority))
......@@ -560,6 +565,9 @@ class JobsController(BaseController):
Session.execute(DataManagement.__table__.insert(), populated.datamanagement)
Session.flush()
Session.commit()
except IntegrityError as err:
Session.rollback()
raise HTTPConflict('The submission is duplicated '+ str(err))
except:
Session.rollback()
raise
......@@ -605,7 +613,7 @@ class JobsController(BaseController):
file_count = Session.query(File).filter(File.vo_name == vo_name)\
.filter(File.file_state.in_(FileActiveStates))\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': None
}, synchronize_session=False)
......@@ -657,7 +665,7 @@ class JobsController(BaseController):
# to fts_url_copy
file_count = Session.query(File).filter(File.file_state.in_(FileActiveStates))\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'file_state': 'CANCELED', 'reason': 'Job canceled by the user', 'dest_surl_uuid':None,
'finish_time': None
}, synchronize_session=False)
......@@ -672,7 +680,7 @@ class JobsController(BaseController):
job_count = Session.query(Job).filter(Job.job_state.in_(JobActiveStates))\
.update({
'job_state': 'CANCELED', 'reason': 'Job canceled by the user',
'job_state': 'CANCELED', 'reason': 'Job canceled by the user',
'job_finished': now
}, synchronize_session=False)
Session.commit()
......
......@@ -22,7 +22,8 @@ import uuid
import pylons
from datetime import datetime
from urlparse import urlparse
from urlparse import urlparse,parse_qsl, ParseResult
from urllib import urlencode
from fts3.model import File, BannedSE
from fts3rest.lib.base import Session
......@@ -72,6 +73,19 @@ def get_storage_element(uri):
"""
return "%s://%s" % (uri.scheme, uri.hostname)
def _is_dest_surl_uuid_enabled(vo_name):
"""
Returns True if the given vo_name allows dest_surl_uuid.
Args:
vo_name: Name of the vo
"""
list_of_vos = pylons.config.get('fts3.CheckDuplicates', 'None')
if not list_of_vos:
return False
if vo_name in list_of_vos or "*" in list_of_vos:
return True
return False
def _validate_url(url):
"""
......@@ -202,7 +216,10 @@ def _select_best_replica(files, vo_name, entry_state, strategy):
if transfer['source_se'] == best_se:
best_index = index
break
files[best_index]['file_state'] = entry_state
if _is_dest_surl_uuid_enabled(vo_name):
files[best_index]['dest_surl_uuid'] = str(uuid.uuid5(BASE_ID, files[best_index]['dest_surl'].encode('utf-8')))
def _apply_banning(files):
......@@ -364,17 +381,35 @@ class JobBuilder(object):
# Multiple replicas, all must share the hashed-id
if shared_hashed_id is None:
shared_hashed_id = _generate_hashed_id()
vo_name = self.user.vos[0]
for source, destination in pairs:
if len(file_dict['sources']) > 1 or not _is_dest_surl_uuid_enabled(vo_name):
dest_uuid = None
else:
dest_uuid = str(uuid.uuid5(BASE_ID, destination.geturl().encode('utf-8')))
if self.is_bringonline:
# add the new query parameter only for root -> EOS-CTA for now
if source.scheme == "root":
query_p = parse_qsl(source.query)
query_p.append(('activity', file_dict.get('activity', 'default')))
query_str = urlencode(query_p)
source = ParseResult(scheme=source.scheme,
netloc=source.netloc,
path=source.path,
params=source.params,
query= query_str,
fragment=source.fragment)
f = dict(
job_id=self.job_id,
file_index=f_index,
dest_surl_uuid=dest_uuid,
file_state=initial_file_state,
source_surl=source.geturl(),
dest_surl=destination.geturl(),
source_se=get_storage_element(source),
dest_se=get_storage_element(destination),
vo_name=None,
priority=self.job['priority'],
user_filesize=_safe_filesize(file_dict.get('filesize', 0)),
selection_strategy=file_dict.get('selection_strategy', 'auto'),
checksum=file_dict.get('checksum', None),
......
......@@ -27,7 +27,11 @@ fileSchema = {
'properties': {
'sources': {'type': 'array', 'items': urlSchema, 'minItems': 1},
'destinations': {'type': 'array', 'items': urlSchema, 'minItems': 1},
'metadata': {'type': ['object', 'null']},
'priority': {
'type': ['integer', 'null'],
'title': 'Job priority'
},
'metadata': {'type': ['object', 'null']},
'filesize': {'type': ['integer', 'null'], 'minimum': 0},
'checksum': {
'type': ['string', 'null'],
......
......@@ -37,11 +37,11 @@
</td>
<td>
<input class="form-control" type="text" placeholder="Source" name="source"
id="link-add-field-source" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*))" required title="i.e. http://mysite.ch or *" />
id="link-add-field-source" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*)[^\/])" required title="i.e. http://mysite.ch or *" />
</td>
<td>
<input class="form-control" type="text" placeholder="Destination" name="destination"
id="link-add-field-destination" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*))" required title="i.e. http://mysite.ch or *" />
id="link-add-field-destination" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*)[^\/])" required title="i.e. http://mysite.ch or *" />
</td>
<td>
<input class="form-control" type="number" placeholder="Streams" name="nostreams"
......
......@@ -137,7 +137,7 @@
<div class="panel-heading">
<h2 class="panel-title">
<input type="text" name="se" placeholder="Storage"
class="form-control" id="se-add-field-se" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*))" required title="i.e. http://mysite.ch or *"/>&nbsp;
class="form-control" id="se-add-field-se" pattern="(\*|.*://?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*)[^\/])" required title="i.e. http://mysite.ch or *"/>&nbsp;
</div>
<div class="panel-body">
<div class="row">
......
......@@ -22,6 +22,7 @@ from fts3rest.lib.base import Session
from fts3.model import Job, File, JobActiveStates, Credential, FileActiveStates, FileTerminalStates
from datetime import datetime, timedelta
import pylons
import random
class TestJobCancel(TestController):
"""
......@@ -43,9 +44,10 @@ class TestJobCancel(TestController):
files = []
for i in xrange(count):
u = random.randint(500, 50000)
files.append({
'sources': ['root://source.es/file%d' % i],
'destinations': ['root://dest.ch/file%d' % i],
'destinations': ['root://destcancel.ch/file'+str(i)+str(u)],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -74,18 +76,20 @@ class TestJobCancel(TestController):
files = []
for i in xrange(count):
u = random.randint(500001, 1000000)
files.append({
'sources': ['root://source.es/file%d' % i],
'destinations': ['root://dest.ch/file%d' % i],
'destinations': ['root://dest.ch/file'+str(i)+str(u)],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
})
for j in xrange(big_files):
u = random.randint(100, 1000)
files.append({
'sources': ['root://source.es/file%d' % i],
'destinations': ['root://dest.ch/file%d' % i],
'destinations': ['root://dest.ch/file%d%d' % (i,u)],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 104857600,
......
......@@ -22,14 +22,16 @@ from fts3.model import FileRetryLog, Job, File
from fts3rest.lib.base import Session
from fts3rest.lib.middleware.fts3auth import UserCredentials, constants
from fts3rest.tests import TestController
import random
class TestJobListing(TestController):
"""
Tests for the job controller, listing, stating, etc.
"""
def _submit(self, file_metadata=None, dest_surl='root://dest.ch/file'):
def _submit(self, file_metadata=None, dest_surl='root://dest.ch/file', random_url=True):
if random_url:
dest_surl = dest_surl+str(random.randint(200,1000))
job = {
'files': [{
'sources': ['root://source.es/file'],
......@@ -349,7 +351,6 @@ class TestJobListing(TestController):
self.assertEqual(1, len(files))
self.assertEqual("root://source.es/file", files[0]['source_surl'])
self.assertEqual("root://dest.ch/file", files[0]['dest_surl'])
self.assertEqual(1024, files[0]['user_filesize'])
def test_no_retries(self):
......@@ -410,7 +411,7 @@ class TestJobListing(TestController):
self.assertIn('start_time', f)
self.assertIn('source_surl', f)
self.assertNotIn('finish_time', f)
self.assertNotIn('dest_surl', f)
self.assertEqual('root://source.es/file', f['source_surl'])
......@@ -443,7 +444,6 @@ class TestJobListing(TestController):
self.assertIn('start_time', f)
self.assertIn('source_surl', f)
self.assertNotIn('finish_time', f)
self.assertNotIn('dest_surl', f)
self.assertIn('file_state', f)
self.assertEqual('SUBMITTED', f['file_state'])
......@@ -540,11 +540,11 @@ class TestJobListing(TestController):
self.setup_gridsite_environment()
self.push_delegation()
job1 = self._submit(dest_surl='gsiftp://test/path')
job2 = self._submit(dest_surl='gsiftp://test2/path')
job1 = self._submit(dest_surl='gsiftp://test-query1/path', random_url=False)
job2 = self._submit(dest_surl='gsiftp://test-query2/path', random_url=False)
files = self.app.get(
url="/files?dest_surl=gsiftp://test2/path",
url="/files?dest_surl=gsiftp://test-query2/path",
status=200
).json
......@@ -552,7 +552,7 @@ class TestJobListing(TestController):
self.assertIn(job2, map(lambda f: f['job_id'], files))
files = self.app.get(
url="/files?dest_surl=gsiftp://test/path",
url="/files?dest_surl=gsiftp://test-query1/path",
status=200
).json
......
......@@ -25,7 +25,7 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from fts3rest.tests import TestController
from fts3rest.lib.base import Session
from fts3.model import File, Job
import random
class TestJobSubmission(TestController):
"""
......@@ -55,7 +55,6 @@ class TestJobSubmission(TestController):
self.assertEqual(len(files), 1)
self.assertEqual(files[0].file_state, 'SUBMITTED')
self.assertEqual(files[0].source_surl, 'root://source.es/file')
self.assertEqual(files[0].dest_surl, 'root://dest.ch/file')
self.assertEqual(files[0].source_se, 'root://source.es')
self.assertEqual(files[0].dest_se, 'root://dest.ch')
self.assertEqual(files[0].file_index, 0)
......@@ -79,11 +78,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -112,11 +111,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -145,11 +144,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -179,11 +178,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -213,11 +212,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -246,11 +245,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -280,11 +279,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'srm://dest.ch:8447/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['srm://source.es:8446/file'],
'destinations': ['srm://dest.ch:8447/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -320,11 +319,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['srm://source.es/?SFN=/path/'],
'destinations': ['http://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
......@@ -362,11 +361,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': None,
'filesize': 1024,
......@@ -397,11 +396,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
......@@ -433,11 +432,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
......@@ -468,11 +467,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
......@@ -503,11 +502,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'checksum': '1234F',
'filesize': 1024,
......@@ -539,11 +538,11 @@ class TestJobSubmission(TestController):
"""
self.setup_gridsite_environment()
self.push_delegation()
dest_surl = 'root://dest.ch/file' + str(random.randint(0, 100))
job = {
'files': [{
'sources': ['root://source.es/file'],
'destinations': ['root://dest.ch/file'],
'destinations': [dest_surl],
'selection_strategy': 'orderly',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
......@@ -574,11 +573,11 @@ class TestJobSubmission(TestController):