Commit 48e07b42 authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon Committed by Maria Arsuaga Rios
Browse files

Improve queries for blacklisting

parent ee6257ad
......@@ -21,7 +21,7 @@ except:
import logging
from datetime import datetime
from pylons import request
from sqlalchemy import distinct, func
from sqlalchemy import distinct, func, and_
from fts3.model import BannedDN, BannedSE, Job, File, JobActiveStates, FileActiveStates
from fts3rest.lib.api import doc
......@@ -82,29 +82,43 @@ def _cancel_transfers(storage=None, vo_name=None):
Returns the list of affected jobs ids.
"""
affected_job_ids = set()
files = Session.query(File)\
.filter((File.source_se == storage) | (File.dest_se == storage))\
.filter(File.file_state.in_(FileActiveStates + ['NOT_USED']))
files = Session.query(File.file_id).filter(
and_(
(File.source_se == storage) | (File.dest_se == storage),
File.file_state.in_(FileActiveStates + ['NOT_USED'])
)
)
if vo_name and vo_name != '*':
files = files.filter(File.vo_name == vo_name)
now = datetime.utcnow()
try:
for file in files:
affected_job_ids.add(file.job_id)
for row in files:
file_id = row[0]
job_id, file_index = Session.query(File.job_id, File.file_index).filter(File.file_id == file_id).one()
affected_job_ids.add(job_id)
# Cancel the affected file
file.file_state = 'CANCELED'
file.reason = 'Storage banned'
file.finish_time = now
Session.merge(file)
Session.query(File).filter(File.file_id == file_id)\
.update({
'file_state': 'CANCELED', 'reason': 'Storage banned',
'finish_time': now
}, synchronize_session=False)
# If there are alternatives, enable them
Session.query(File).filter(File.job_id == file.job_id)\
.filter(File.file_index == file.file_index)\
.filter(File.file_state == 'NOT_USED').update({'file_state': 'SUBMITTED'})
if Session.bind.dialect.name == 'mysql':
limit = " LIMIT 1"
else:
limit = ''
Session.execute(
"UPDATE t_file SET"
" file_state = 'SUBMITTED' "
"WHERE"
" job_id = :job_id AND file_index = :file_index AND file_state = 'NOT_USED' " + limit,
dict(job_id=job_id, file_index=file_index)
)
# Or next queries will not see the changes!
Session.commit()
Session.expire_all()
except Exception:
Session.rollback()
raise
......@@ -167,31 +181,39 @@ def _cancel_jobs(dn):
raise
def _set_to_wait_helper(storage, vo_name, from_state, to_state):
"""
Helper for _set_to_wait
"""
file_ids = Session.query(File.file_id).filter(
and_(
File.file_state == from_state),
(File.source_se == storage) | (File.dest_se == storage)
)
if vo_name and vo_name != '*':
file_ids = file_ids.filter(File.vo_name == vo_name)
file_ids = map(lambda j: j[0], file_ids.all())
job_ids = set()
for file_id in file_ids:
Session.query(File).filter(File.file_id == file_id).update({'file_state': to_state}, synchronize_session=False)
job_ids.add(Session.query(File).get(file_id).job_id)
return job_ids
def _set_to_wait(storage, vo_name):
"""
Updates the transfers that have the given storage either in source or destination,
and belong to the given VO.
Returns the list of affected jobs ids.
"""
job_ids = Session.query(distinct(File.job_id))\
.filter((File.source_se == storage) | (File.dest_se == storage)).filter(File.file_state.in_(FileActiveStates))
if vo_name and vo_name != '*':
job_ids = job_ids.filter(File.vo_name == vo_name)
job_ids = map(lambda j: j[0], job_ids.all())
try:
for job_id in job_ids:
Session.query(File).filter(File.job_id == job_id, File.file_state == 'STAGING')\
.update({'file_state': 'ON_HOLD_STAGING'}, synchronize_session=False)
Session.query(File).filter(File.job_id == job_id, File.file_state == 'SUBMITTED')\
.update({'file_state': 'ON_HOLD'}, synchronize_session=False)
job_ids = _set_to_wait_helper(storage, vo_name, 'SUBMITTED', 'ON_HOLD')
job_ids.update(_set_to_wait_helper(storage, vo_name, 'STAGING', 'ON_HOLD_STAGING'))
Session.commit()
Session.expire_all()
except Exception:
Session.rollback()
raise
return job_ids
def _reenter_queue(storage, vo_name):
......
......@@ -312,9 +312,9 @@ class TestBanning(TestController):
status=200
).json
self.assertEqual(2, len(waiting_ids))
self.assertEqual(1, len(waiting_ids))
self.assertIn(jobs[0], waiting_ids)
self.assertIn(jobs[1], waiting_ids)
self.assertNotIn(jobs[1], waiting_ids)
self.assertNotIn(jobs[2], waiting_ids)
for job_id in jobs[0:2]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment