diff --git a/src/fts3rest/fts3rest/controllers/banning.py b/src/fts3rest/fts3rest/controllers/banning.py index a8b001c75f7e253651490eda7c5b5c0fdf56a70b..b0cab26c08c60d73df15dc8b9a8cdea68805e8ea 100644 --- a/src/fts3rest/fts3rest/controllers/banning.py +++ b/src/fts3rest/fts3rest/controllers/banning.py @@ -178,7 +178,7 @@ def _cancel_jobs(dn): jobs = Session.query(Job.job_id).filter( Job.job_state.in_(JobActiveStates), Job.user_dn == dn, Job.job_finished == None ) - job_ids = map(lambda j: j[0], jobs) + job_ids = [j[0] for j in jobs] try: now = datetime.utcnow() @@ -361,7 +361,7 @@ def ban_dn(): except Exception: raise BadRequest("Malformed input") else: - input_dict = request.params + input_dict = request.values user = request.environ["fts3.User.Credentials"] dn = input_dict.get("user_dn", None) @@ -385,7 +385,7 @@ def unban_dn(): """ Unban a user """ - dn = request.params.get("user_dn", None) + dn = request.values.get("user_dn", None) if not dn: raise BadRequest("Missing user_dn parameter") diff --git a/src/fts3rest/fts3rest/tests/functional/test_banning.py b/src/fts3rest/fts3rest/tests/functional/test_banning.py new file mode 100644 index 0000000000000000000000000000000000000000..fe1c6d0bda3709bda549f5b4c1793e059504a2db --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_banning.py @@ -0,0 +1,691 @@ +import json + +from urllib.parse import quote +from fts3.model import BannedDN, BannedSE, Job, File +from fts3rest.model.meta import Session +from fts3rest.tests import TestController + +from datetime import datetime, timedelta +import uuid + + +def insert_job(vo, source=None, destination=None, state="SUBMITTED", **kwargs): + job = Job() + job.user_dn = kwargs.get("user_dn", "/DC=ch/DC=cern/CN=Test User") + job.vo_name = vo + job.source_se = source + job.dest_se = destination + job.job_state = state + job.submit_time = datetime.utcnow() + + duration = kwargs.get("duration", 0) + queued = kwargs.get("queued", 0) + if duration and queued: + job.finish_time = job.submit_time + timedelta(seconds=duration + queued) + elif duration: + job.finish_time = job.submit_time + timedelta(seconds=duration) + job.job_id = str(uuid.uuid4()) + + Session.merge(job) + + multiple = kwargs.get("multiple", [(source, destination)]) + + for source_se, dest_se in multiple: + transfer = File() + transfer.job_id = job.job_id + transfer.vo_name = vo + transfer.source_se = source_se + transfer.source_surl = source_se + "/path" + transfer.dest_se = dest_se + transfer.dest_surl = dest_se + "/path" + transfer.file_state = state + if queued: + transfer.start_time = job.submit_time + timedelta(seconds=queued) + if duration: + transfer.tx_duration = duration + transfer.reason = kwargs.get("reason", None) + transfer.throughput = kwargs.get("thr", None) + Session.merge(transfer) + Session.commit() + return job.job_id + + +class TestBanning(TestController): + """ + Tests for user and storage banning + """ + + def setUp(self): + super().setUp() + self.setup_gridsite_environment() + + def tearDown(self): + Session.query(BannedDN).delete() + Session.query(BannedSE).delete() + super().tearDown() + + def test_ban_dn(self): + """ + Just ban a DN and unban it, make sure changes go into the DB + """ + canceled = self.app.post( + url="/ban/dn", + params={"user_dn": "/DC=cern/CN=someone", "message": "TEST BAN"}, + status=200, + ).json + self.assertEqual(0, len(canceled)) + + banned = Session.query(BannedDN).get("/DC=cern/CN=someone") + self.assertNotEqual(None, banned) + self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn) + self.assertEqual("TEST BAN", banned.message) + + self.app.delete( + url="/ban/dn?user_dn=%s" % quote("/DC=cern/CN=someone"), status=204 + ) + banned = Session.query(BannedDN).get("/DC=cern/CN=someone") + self.assertEqual(None, banned) + + def test_list_banned_dns(self): + """ + Ban a DN and make sure it is in the list + """ + canceled = self.app.post( + url="/ban/dn", params={"user_dn": "/DC=cern/CN=someone"}, status=200 + ).json + self.assertEqual(0, len(canceled)) + + banned = self.app.get(url="/ban/dn", status=200).json + self.assertIn("/DC=cern/CN=someone", [b["dn"] for b in banned]) + + self.app.delete( + url="/ban/dn?user_dn=%s" % quote("/DC=cern/CN=someone"), status=204 + ) + + banned = self.app.get(url="/ban/dn", status=200).json + self.assertNotIn("/DC=cern/CN=someone", [b["dn"] for b in banned]) + + def test_ban_dn_submission(self): + """ + If a DN is banned, submissions from this user must not be accepted + """ + banned = BannedDN() + banned.dn = self.get_user_credentials().user_dn + Session.merge(banned) + Session.commit() + + self.push_delegation() + self.app.post( + url="/jobs", content_type="application/json", params="[]", status=403 + ) + + def test_ban_self(self): + """ + A user can not ban (him|her)self + """ + user_dn = self.get_user_credentials().user_dn + self.app.post(url="/ban/dn", params={"user_dn": user_dn}, status=409) + + def test_ban_dn_cancel(self): + """ + Ban a DN that has transfers running, make sure they are canceled + """ + jobs = list() + jobs.append( + insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination", + "SUBMITTED", + user_dn="/DC=cern/CN=someone", + ) + ) + jobs.append( + insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination2", + "ACTIVE", + user_dn="/DC=cern/CN=someone", + ) + ) + jobs.append( + insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination2", + "FAILED", + duration=10, + queued=20, + user_dn="/DC=cern/CN=someone", + ) + ) + + canceled_ids = self.app.post( + url="/ban/dn", params={"user_dn": "/DC=cern/CN=someone"}, status=200 + ).json + + self.assertEqual(2, len(canceled_ids)) + self.assertIn(jobs[0], canceled_ids) + self.assertIn(jobs[1], canceled_ids) + self.assertNotIn(jobs[2], canceled_ids) + + for job_id in jobs[0:2]: + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + self.assertEqual("CANCELED", job.job_state) + self.assertNotEqual(None, job.job_finished) + self.assertEqual("User banned", job.reason) + for f in files: + self.assertEqual("CANCELED", f.file_state) + self.assertNotEqual(None, f.finish_time) + self.assertEqual("User banned", f.reason) + + job = Session.query(Job).get(jobs[2]) + self.assertEqual(job.job_state, "FAILED") + files = Session.query(File).filter(File.job_id == job.job_id) + for f in files: + self.assertEqual("FAILED", f.file_state) + + def test_ban_se(self): + """ + Just ban a SE and unban it, make sure changes go into the DB + """ + canceled = self.app.post( + url="/ban/se", + params={"storage": "gsiftp://nowhere", "message": "TEST BAN 42"}, + status=200, + ).json + self.assertEqual(0, len(canceled)) + banned = ( + Session.query(BannedSE).filter(BannedSE.se == "gsiftp://nowhere").first() + ) + self.assertNotEqual(None, banned) + self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn) + self.assertEqual("CANCEL", banned.status) + self.assertEqual("TEST BAN 42", banned.message) + self.app.delete( + url="/ban/se?storage=%s" % quote("gsiftp://nowhere"), status=204 + ) + banned = ( + Session.query(BannedSE).filter(BannedSE.se == "gsiftp://nowhere").first() + ) + self.assertEqual(None, banned) + + def test_list_banned_ses(self): + """ + Ban a SE and make sure it is in the list + """ + canceled = self.app.post( + url="/ban/se", params={"storage": "gsiftp://nowhere"}, status=200 + ).json + self.assertEqual(0, len(canceled)) + + banned = self.app.get(url="/ban/se", status=200).json + self.assertIn("gsiftp://nowhere", [b["se"] for b in banned]) + + self.app.delete( + url="/ban/se?storage=%s" % quote("gsiftp://nowhere"), status=204 + ) + + banned = self.app.get(url="/ban/se", status=200).json + self.assertNotIn("gsiftp://nowhere", [b["se"] for b in banned]) + + def test_ban_se_vo(self): + """ + Just ban a SE and unban it, specifying a VO + """ + canceled = self.app.post( + url="/ban/se", + params={"storage": "gsiftp://nowhere", "vo_name": "testvo"}, + status=200, + ).json + self.assertEqual(0, len(canceled)) + + banned = Session.query(BannedSE).get(("gsiftp://nowhere", "testvo")) + self.assertNotEqual(None, banned) + self.assertEqual(self.get_user_credentials().user_dn, banned.admin_dn) + self.assertEqual("CANCEL", banned.status) + self.assertEqual("testvo", banned.vo) + + self.app.delete( + url="/ban/se?storage=%s&vo_name=testvo" % quote("gsiftp://nowhere"), + status=204, + ) + banned = Session.query(BannedSE).get(("gsiftp://nowhere", "someone")) + self.assertEqual(None, banned) + + def test_ban_se_cancel(self): + """ + Ban a SE that has files queued, make sure they are canceled + """ + jobs = list() + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination2", "ACTIVE") + ) + jobs.append( + insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination2", + "FAILED", + duration=10, + queued=20, + ) + ) + + canceled_ids = self.app.post( + url="/ban/se", params={"storage": "gsiftp://source"}, status=200 + ).json + + self.assertEqual(2, len(canceled_ids)) + self.assertIn(jobs[0], canceled_ids) + self.assertIn(jobs[1], canceled_ids) + self.assertNotIn(jobs[2], canceled_ids) + + for job_id in jobs[0:2]: + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + self.assertEqual("CANCELED", job.job_state) + self.assertNotEqual(None, job.job_finished) + for f in files: + self.assertEqual("CANCELED", f.file_state) + self.assertNotEqual(None, f.finish_time) + self.assertEqual("Storage banned", f.reason) + + job = Session.query(Job).get(jobs[2]) + self.assertEqual(job.job_state, "FAILED") + files = Session.query(File).filter(File.job_id == job.job_id) + for f in files: + self.assertEqual("FAILED", f.file_state) + + def test_ban_se_partial_job(self): + """ + Ban a SE that has files queued. If a job has other pairs, the job must remain! + """ + job_id = insert_job( + "testvo", + multiple=[ + ("gsiftp://source", "gsiftp://destination"), + ("gsiftp://other", "gsiftp://destination"), + ], + ) + canceled_ids = self.app.post( + url="/ban/se", params={"storage": "gsiftp://source"}, status=200 + ).json + + self.assertEqual(1, len(canceled_ids)) + self.assertEqual(job_id, canceled_ids[0]) + + job = Session.query(Job).get(job_id) + self.assertEqual("SUBMITTED", job.job_state) + self.assertEqual(None, job.job_finished) + + files = Session.query(File).filter(File.job_id == job_id) + for f in files: + if f.source_se == "gsiftp://source": + self.assertEqual("CANCELED", f.file_state) + self.assertNotEqual(None, f.finish_time) + else: + self.assertEqual("SUBMITTED", f.file_state) + + def test_ban_se_cancel_vo(self): + """ + Cancel a SE that has files queued, make sure they are canceled (with VO) + """ + jobs = list() + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("atlas", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("atlas", "gsiftp://source", "gsiftp://destination2", "SUBMITTED") + ) + + canceled_ids = self.app.post( + url="/ban/se", + params={ + "storage": "gsiftp://source", + "status": "cancel", + "vo_name": "testvo", + }, + status=200, + ).json + + self.assertEqual(1, len(canceled_ids)) + self.assertIn(jobs[0], canceled_ids) + + for job_id in jobs: + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + + if job_id in canceled_ids: + self.assertEqual("CANCELED", job.job_state) + else: + self.assertEqual("SUBMITTED", job.job_state) + for f in files: + if job_id in canceled_ids: + self.assertEqual("CANCELED", f.file_state) + else: + self.assertEqual("SUBMITTED", f.file_state) + + def test_ban_se_wait(self): + """ + Ban a SE, but instead of canceling, give jobs some time to finish + """ + jobs = list() + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination2", "ACTIVE") + ) + jobs.append( + insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination2", + "FAILED", + duration=10, + queued=20, + ) + ) + + waiting_ids = self.app.post( + url="/ban/se", + params={"storage": "gsiftp://source", "status": "wait", "timeout": 1234}, + status=200, + ).json + + self.assertEqual(1, len(waiting_ids)) + self.assertIn(jobs[0], waiting_ids) + self.assertNotIn(jobs[1], waiting_ids) + self.assertNotIn(jobs[2], waiting_ids) + + for job_id in jobs[0:2]: + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + self.assertIn(job.job_state, ["ACTIVE", "SUBMITTED"]) + self.assertEqual(None, job.job_finished) + for f in files: + self.assertIn(f.file_state, ["ACTIVE", "ON_HOLD"]) + self.assertEqual(None, f.finish_time) + + job = Session.query(Job).get(jobs[2]) + self.assertEqual(job.job_state, "FAILED") + files = Session.query(File).filter(File.job_id == job.job_id) + for f in files: + self.assertEqual("FAILED", f.file_state) + + banned = Session.query(BannedSE).get(("gsiftp://source", "testvo")) + self.assertEqual("WAIT", banned.status) + + def test_ban_se_wait_vo(self): + """ + Ban a SE, but instead of canceling, give jobs some time to finish (with VO) + """ + jobs = list() + jobs.append( + insert_job("testvo", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("atlas", "gsiftp://source", "gsiftp://destination", "SUBMITTED") + ) + jobs.append( + insert_job("atlas", "gsiftp://source", "gsiftp://destination2", "SUBMITTED") + ) + + waiting_ids = self.app.post( + url="/ban/se", + params={ + "storage": "gsiftp://source", + "status": "wait", + "vo_name": "testvo", + "timeout": 33, + }, + status=200, + ).json + + self.assertEqual(1, len(waiting_ids)) + self.assertIn(jobs[0], waiting_ids) + + for job_id in jobs: + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + + self.assertEqual("SUBMITTED", job.job_state) + for f in files: + if job_id in waiting_ids: + self.assertEqual("ON_HOLD", f.file_state) + else: + self.assertEqual("SUBMITTED", f.file_state) + + def test_ban_se_no_submit(self): + """ + Ban a SE. Submissions to/from se must not be accepted + """ + self.push_delegation() + + self.app.post(url="/ban/se", params={"storage": "gsiftp://source"}, status=200) + + job = { + "files": [ + { + "sources": ["gsiftp://source/path/"], + "destinations": ["gsiftp://destination/file"], + } + ] + } + self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=403, + ) + + # The other way around + job = { + "files": [ + { + "sources": ["gsiftp://destination/file"], + "destinations": ["gsiftp://source/path/"], + } + ] + } + self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=403, + ) + + def test_ban_se_with_submission(self): + """ + Ban a SE but allowing submissions + """ + self.push_delegation() + + self.app.post( + url="/ban/se", + params={ + "storage": "gsiftp://source", + "status": "wait", + "allow_submit": True, + }, + status=200, + ) + + job = { + "files": [ + { + "sources": ["gsiftp://source/path/"], + "destinations": ["gsiftp://destination/file"], + } + ] + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + files = Session.query(File).filter(File.job_id == job_id) + for f in files: + self.assertEqual("ON_HOLD", f.file_state) + + # The other way around + job = { + "files": [ + { + "sources": ["gsiftp://destination/file"], + "destinations": ["gsiftp://source/path/"], + } + ] + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + files = Session.query(File).filter(File.job_id == job_id) + for f in files: + self.assertEqual("ON_HOLD", f.file_state) + + def test_unban_wait(self): + """ + Regression for FTS-297 + When unbanning a storage, if any file was left on wait, they must re-enter the queue + """ + job_id = insert_job( + "testvo", + "gsiftp://source", + "gsiftp://destination", + "SUBMITTED", + user_dn="/DC=cern/CN=someone", + ) + self.app.post( + url="/ban/se", + params={ + "storage": "gsiftp://source", + "status": "wait", + "allow_submit": True, + }, + status=200, + ) + + files = Session.query(File).filter(File.job_id == job_id) + for f in files: + self.assertEqual("ON_HOLD", f.file_state) + + self.app.delete(url="/ban/se?storage=%s" % quote("gsiftp://source"), status=204) + + files = Session.query(File).filter(File.job_id == job_id) + for f in files: + self.assertEqual("SUBMITTED", f.file_state) + + # Some requests that must be rejected + def test_ban_dn_empty(self): + """ + Banning with a missing dn must fail + """ + self.app.post_json(url="/ban/dn", params={}, status=400) + + def test_unban_dn_empty(self): + """ + Unbanning with a missing dn must fail + """ + self.app.delete(url="/ban/dn", status=400) + + def test_ban_se_empty(self): + """ + Ask for banning with a missing storage must fail + """ + self.app.post_json(url="/ban/se", params={}, status=400) + + def test_unban_se_empty(self): + """ + Unbanning with a missing se must fail + """ + self.app.delete(url="/ban/se", status=400) + + def test_ban_se_cancel_and_submit(self): + """ + Setting status = cancel and ask for allow_submit must fail + """ + self.app.post( + url="/ban/se", + params={ + "storage": "gsiftp://source", + "status": "cancel", + "allow_submit": True, + }, + status=400, + ) + + def test_ban_se_bad_status(self): + """ + Unbanning with something else than cancel or wait must fail + """ + self.app.post( + url="/ban/se", + params={"storage": "gsiftp://source", "status": "blahblah"}, + status=400, + ) + + def test_ban_se_staging(self): + """ + Ban a storage with transfers queued as STAGING, submit a new STAGING, unban. + Final state must be STAGING + """ + self.push_delegation() + + pre_job_id = insert_job( + "testvo", + "srm://source", + "srm://destination", + "STAGING", + user_dn="/DC=cern/CN=someone", + ) + self.app.post( + url="/ban/se", + params={"storage": "srm://source", "status": "wait", "allow_submit": True}, + status=200, + ) + + files = Session.query(File).filter(File.job_id == pre_job_id) + for f in files: + self.assertEqual("ON_HOLD_STAGING", f.file_state) + + job = { + "files": [ + { + "sources": ["srm://source/file"], + "destinations": ["gsiftp://destination2/path/"], + } + ], + "params": {"copy_pin_lifetime": 1234}, + } + post_job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + files = Session.query(File).filter(File.job_id == post_job_id) + for f in files: + self.assertEqual("ON_HOLD_STAGING", f.file_state) + + self.app.delete(url="/ban/se?storage=%s" % quote("srm://source"), status=204) + + files = Session.query(File).filter(File.job_id.in_((pre_job_id, post_job_id))) + for f in files: + self.assertEqual("STAGING", f.file_state)