Commit d631d31e authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-859: Allow storage blacklisting, no timeout

Make blacklisting per storage+vo.
Introduce states ON_HOLD and ON_HOLD_STAGING
parent c3c3fbaa
Pipeline #71354 failed with stage
in 20 seconds
......@@ -28,7 +28,6 @@ class BannedDN(Base):
addition_time = Column(DateTime)
admin_dn = Column(String(1024))
status = Column(String(10))
wait_timeout = Column(Integer(), default=0)
class BannedSE(Base):
......@@ -38,6 +37,6 @@ class BannedSE(Base):
message = Column(String(256))
addition_time = Column(DateTime)
admin_dn = Column(String(1024))
vo = Column(String(100))
vo = Column(String(100), primary_key=True, nullable=True)
status = Column(String(10))
wait_timeout = Column(Integer(), default=0)
......@@ -25,7 +25,7 @@ from base import Base, Json
FileActiveStates = ['STAGING', 'STARTED', 'SUBMITTED', 'READY', 'ACTIVE']
FileTerminalStates = ['FINISHED', 'FAILED', 'CANCELED']
# NOT_USED is not terminal, nor not-terminal
FileOnHoldStates = ['NOT_USED']
FileOnHoldStates = ['NOT_USED', 'ON_HOLD', 'ON_HOLD_STAGING']
# sqlite doesn't like auto increment with BIGINT, so we need to use a variant
# on that case
......@@ -67,8 +67,6 @@ class File(Base):
log_file = Column(String(2048))
log_debug = Column('log_file_debug', Integer)
activity = Column(String(255), default = 'default')
wait_timestamp = Column(DateTime)
wait_timeout = Column(Integer)
retries = relation("FileRetryLog", uselist=True, lazy=True,
backref=backref("file", lazy=False))
......
......@@ -33,7 +33,7 @@ from fts3rest.lib.middleware.fts3auth.constants import *
log = logging.getLogger(__name__)
def _ban_se(storage, vo_name, allow_submit, status, timeout, message):
def _ban_se(storage, vo_name, allow_submit, status, message):
"""
Mark in the db the given storage as banned
"""
......@@ -48,7 +48,6 @@ def _ban_se(storage, vo_name, allow_submit, status, timeout, message):
banned.status = 'WAIT_AS'
else:
banned.status = status
banned.wait_timeout = timeout
try:
Session.merge(banned)
Session.commit()
......@@ -85,7 +84,7 @@ def _cancel_transfers(storage=None, vo_name=None):
files = Session.query(File)\
.filter((File.source_se == storage) | (File.dest_se == storage))\
.filter(File.file_state.in_(FileActiveStates + ['NOT_USED']))
if vo_name:
if vo_name and vo_name != '*':
files = files.filter(File.vo_name == vo_name)
now = datetime.utcnow()
......@@ -167,7 +166,7 @@ def _cancel_jobs(dn):
raise
def _set_to_wait(storage=None, vo_name=None, timeout=0):
def _set_to_wait(storage, vo_name):
"""
Updates the transfers that have the given storage either in source or destination,
and belong to the given VO.
......@@ -175,22 +174,48 @@ def _set_to_wait(storage=None, vo_name=None, timeout=0):
"""
job_ids = Session.query(distinct(File.job_id))\
.filter((File.source_se == storage) | (File.dest_se == storage)).filter(File.file_state.in_(FileActiveStates))
if vo_name:
if vo_name and vo_name != '*':
job_ids = job_ids.filter(File.vo_name == vo_name)
job_ids = map(lambda j: j[0], job_ids.all())
try:
for job_id in job_ids:
Session.query(File).filter(File.job_id == job_id).filter(File.file_state.in_(FileActiveStates))\
.update({'wait_timestamp': datetime.utcnow(), 'wait_timeout': timeout}, synchronize_session=False)
Session.query(File).filter(File.job_id == job_id, File.file_state == 'STAGING')\
.update({'file_state': 'ON_HOLD_STAGING'}, synchronize_session=False)
Session.query(File).filter(File.job_id == job_id, File.file_state == 'SUBMITTED')\
.update({'file_state': 'ON_HOLD'}, synchronize_session=False)
Session.commit()
Session.expire_all()
return job_ids
except Exception:
Session.rollback()
raise
return job_ids
def _reenter_queue(storage, vo_name):
"""
Resets to SUBMITTED or STAGING those transfers that were set ON_HOLD with a previous banning
Returns the list of affects job ids.
"""
job_ids = Session.query(distinct(File.job_id))\
.filter((File.source_se == storage) | (File.dest_se == storage)).filter(File.file_state.in_(['ON_HOLD', 'ON_HOLD_STAGING']))
if vo_name and vo_name != '*':
job_ids = job_ids.filter(File.vo_name == vo_name)
job_ids = map(lambda j: j[0], job_ids.all())
try:
for job_id in job_ids:
Session.query(File).filter(File.job_id == job_id, File.file_state == 'ON_HOLD_STAGING')\
.update({'file_state': 'STAGING'}, synchronize_session=False)
Session.query(File).filter(File.job_id == job_id, File.file_state == 'ON_HOLD')\
.update({'file_state': 'SUBMITTED'}, synchronize_session=False)
except Exception:
Session.rollback()
raise
return job_ids
class BanningController(BaseController):
"""
......@@ -232,7 +257,9 @@ class BanningController(BaseController):
if not storage:
raise HTTPBadRequest('Missing storage parameter')
vo_name = input_dict.get('vo_name', None)
vo_name = input_dict.get('vo_name', '*')
if vo_name is None:
raise HTTPBadRequest('vo_name can not be null')
allow_submit = bool(input_dict.get('allow_submit', False))
status = input_dict.get('status', 'cancel').upper()
......@@ -242,19 +269,12 @@ class BanningController(BaseController):
if allow_submit and status == 'CANCEL':
raise HTTPBadRequest('allow_submit and status = CANCEL can not be combined')
try:
timeout = int(input_dict.get('timeout', 0))
if timeout < 0:
raise ValueError()
except ValueError:
raise HTTPBadRequest('timeout expects an integer equal or greater than zero')
_ban_se(storage, vo_name, allow_submit, status, timeout, input_dict.get('message', ''))
_ban_se(storage, vo_name, allow_submit, status, input_dict.get('message', ''))
if status == 'CANCEL':
affected = _cancel_transfers(storage=storage, vo_name=vo_name)
else:
affected = _set_to_wait(storage=storage, vo_name=vo_name, timeout=timeout)
affected = _set_to_wait(storage=storage, vo_name=vo_name)
log.warn("Storage %s banned (%s), %d jobs affected" % (storage, status, len(affected)))
return affected
......@@ -298,6 +318,7 @@ class BanningController(BaseController):
@doc.response(400, 'storage is empty or missing')
@doc.response(403, 'The user is not allowed to perform configuration actions')
@authorize(CONFIG)
@jsonify
def unban_se(self, start_response):
"""
Unban a storage element
......@@ -305,18 +326,16 @@ class BanningController(BaseController):
storage = request.params.get('storage', None)
if not storage:
raise HTTPBadRequest('Missing storage parameter')
vo_name = request.params.get('vo_name', '*')
if vo_name is None:
raise HTTPBadRequest('vo_name can not be null')
banned = Session.query(BannedSE).get(storage)
banned = Session.query(BannedSE).get((storage, vo_name))
job_ids = []
if banned:
try:
Session.delete(banned)
Session.query(File)\
.filter(File.file_state == 'SUBMITTED')\
.filter((File.source_se == storage) | (File.dest_se == storage))\
.update({
'wait_timestamp': None,
'wait_timeout': None
}, synchronize_session=False)
job_ids= _reenter_queue(storage, vo_name)
Session.commit()
except Exception:
Session.rollback()
......@@ -326,7 +345,7 @@ class BanningController(BaseController):
log.warn("Unban of storage %s without effect" % storage)
start_response('204 No Content', [])
return ['']
return job_ids
@doc.query_arg('user_dn', 'User DN to unban', required=True)
@doc.response(204, 'Success')
......
......@@ -216,33 +216,32 @@ def _apply_banning(files):
# We then build a dictionary to make look up easy
banned_ses = dict()
for b in Session.query(BannedSE):
banned_ses[str(b.se)] = (b.vo, b.status, b.wait_timeout)
banned_ses[str(b.se)] = (b.vo, b.status)
now = datetime.utcnow()
for f in files:
source_banned = banned_ses.get(str(f['source_se']), None)
dest_banned = banned_ses.get(str(f['dest_se']), None)
timeout = None
banned = False
if source_banned and (source_banned[0] == f['vo_name'] or source_banned[0] is None):
if source_banned and (source_banned[0] == f['vo_name'] or source_banned[0] == '*'):
if source_banned[1] != 'WAIT_AS':
raise HTTPForbidden("%s is banned" % f['source_se'])
timeout = source_banned[2]
banned = True
if dest_banned and (dest_banned[0] == f['vo_name'] or dest_banned[0] is None):
if dest_banned and (dest_banned[0] == f['vo_name'] or dest_banned[0] == '*'):
if dest_banned[1] != 'WAIT_AS':
raise HTTPForbidden("%s is banned" % f['dest_se'])
if not timeout:
timeout = dest_banned[2]
banned = True
if banned:
if f['file_state'] == 'SUBMITTED':
f['file_state'] = 'ON_HOLD'
elif f['file_state'] == 'STAGING':
f['file_state'] = 'ON_HOLD_STAGING'
elif f['file_state'] == 'DELETE':
continue
else:
timeout = max(timeout, dest_banned[2])
if timeout is not None:
f['wait_timestamp'] = now
f['wait_timeout'] = timeout
else:
f['wait_timestamp'] = None
f['wait_timeout'] = None
HTTPInternalServerError('Unexpected initial state: %s' % f['file_state'])
def _seconds_from_value(value):
......
......@@ -149,15 +149,15 @@ class TestBanning(TestController):
).json
self.assertEqual(0, len(canceled))
banned = Session.query(BannedSE).get('gsiftp://nowhere')
banned = Session.query(BannedSE).get(('gsiftp://nowhere', '*'))
self.assertNotEqual(None, banned)
self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)
self.assertEqual('CANCEL', banned.status)
self.assertEqual(None, banned.vo)
self.assertEqual('*', banned.vo)
self.assertEqual('TEST BAN 42', banned.message)
self.app.delete(url="/ban/se?storage=%s" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get('gsiftp://nowhere')
banned = Session.query(BannedSE).get(('gsiftp://nowhere', '*'))
self.assertEqual(None, banned)
def test_list_banned_ses(self):
......@@ -190,14 +190,14 @@ class TestBanning(TestController):
).json
self.assertEqual(0, len(canceled))
banned = Session.query(BannedSE).get('gsiftp://nowhere')
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'dteam'))
self.assertNotEqual(None, banned)
self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn)
self.assertEqual('CANCEL', banned.status)
self.assertEqual('dteam', banned.vo)
self.app.delete(url="/ban/se?storage=%s" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get('gsiftp://nowhere')
self.app.delete(url="/ban/se?storage=%s&vo_name=dteam" % urllib.quote('gsiftp://nowhere'), status=204)
banned = Session.query(BannedSE).get(('gsiftp://nowhere', 'dteam'))
self.assertEqual(None, banned)
def test_ban_se_cancel(self):
......@@ -323,10 +323,8 @@ class TestBanning(TestController):
self.assertIn(job.job_state, ['ACTIVE', 'SUBMITTED'])
self.assertEqual(None, job.job_finished)
for f in files:
self.assertIn(f.file_state, ['ACTIVE', 'SUBMITTED'])
self.assertIn(f.file_state, ['ACTIVE', 'ON_HOLD'])
self.assertEqual(None, f.finish_time)
self.assertEqual(1234, f.wait_timeout)
self.assertGreater(f.wait_timestamp, datetime.utcnow() - timedelta(minutes=1))
job = Session.query(Job).get(jobs[2])
self.assertEqual(job.job_state, 'FAILED')
......@@ -334,9 +332,8 @@ class TestBanning(TestController):
for f in files:
self.assertEqual('FAILED', f.file_state)
banned = Session.query(BannedSE).get('gsiftp://source')
banned = Session.query(BannedSE).get(('gsiftp://source', '*'))
self.assertEqual('WAIT', banned.status)
self.assertEqual(1234, banned.wait_timeout)
def test_ban_se_wait_vo(self):
"""
......@@ -362,11 +359,10 @@ class TestBanning(TestController):
self.assertEqual('SUBMITTED', job.job_state)
for f in files:
self.assertEqual('SUBMITTED', f.file_state)
if job_id in waiting_ids:
self.assertEqual(33, f.wait_timeout)
self.assertEqual('ON_HOLD', f.file_state)
else:
self.assertEqual(None, f.wait_timeout)
self.assertEqual('SUBMITTED', f.file_state)
def test_ban_se_no_submit(self):
"""
......@@ -419,8 +415,7 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
for f in files:
self.assertEqual(0, f.wait_timeout)
self.assertGreater(f.wait_timestamp, datetime.utcnow() - timedelta(minutes=1))
self.assertEqual('ON_HOLD', f.file_state)
# The other way around
job = {
......@@ -438,8 +433,7 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
for f in files:
self.assertEqual(0, f.wait_timeout)
self.assertGreater(f.wait_timestamp, datetime.utcnow() - timedelta(minutes=1))
self.assertEqual('ON_HOLD', f.file_state)
def test_unban_wait(self):
"""
......@@ -454,15 +448,13 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
for f in files:
self.assertIsNotNone(f.wait_timestamp)
self.assertIsNotNone(f.wait_timeout)
self.assertEqual('ON_HOLD', f.file_state)
self.app.delete(url="/ban/se?storage=%s" % urllib.quote('gsiftp://source'), status=204)
files = Session.query(File).filter(File.job_id == job_id)
for f in files:
self.assertIsNone(f.wait_timestamp)
self.assertIsNone(f.wait_timeout)
self.assertEqual('SUBMITTED', f.file_state)
# Some requests that must be rejected
def test_ban_dn_empty(self):
......@@ -489,19 +481,6 @@ class TestBanning(TestController):
"""
self.app.delete(url="/ban/se", status=400)
def test_ban_se_bad_timeout(self):
"""
Ban SE with invalid timeout values
"""
self.app.post(
url="/ban/se", params={'storage': 'gsiftp://source', 'status': 'wait', 'timeout': 'xxx'},
status=400
)
self.app.post(
url="/ban/se", params={'storage': 'gsiftp://source', 'status': 'wait', 'timeout': -1},
status=400
)
def test_ban_se_cancel_and_submit(self):
"""
Setting status = cancel and ask for allow_submit must fail
......@@ -519,3 +498,46 @@ class TestBanning(TestController):
url="/ban/se", params={'storage': 'gsiftp://source', 'status': 'blahblah'},
status=400
)
def test_ban_se_staging(self):
"""
Ban a storage with transfers queued as STAGING, submit a new STAGING, unban.
Final state must be STAGING
"""
self.push_delegation()
pre_job_id = insert_job('dteam', 'srm://source', 'srm://destination', 'STAGING', user_dn='/DC=cern/CN=someone')
self.app.post(
url="/ban/se", params={'storage': 'srm://source', 'status': 'wait', 'allow_submit': True},
status=200
)
files = Session.query(File).filter(File.job_id == pre_job_id)
for f in files:
self.assertEqual('ON_HOLD_STAGING', f.file_state)
job = {
'files': [{
'sources': ['srm://source/file'],
'destinations': ['gsiftp://destination2/path/']
}],
'params': {
'copy_pin_lifetime': 1234
}
}
post_job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
files = Session.query(File).filter(File.job_id == post_job_id)
for f in files:
self.assertEqual('ON_HOLD_STAGING', f.file_state)
self.app.delete(url="/ban/se?storage=%s" % urllib.quote('srm://source'), status=204)
files = Session.query(File).filter(File.job_id.in_((pre_job_id, post_job_id)))
for f in files:
self.assertEqual('STAGING', f.file_state)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment