diff --git a/README.md b/README.md index dc1a57e3cc03398ecbe0afe6d4749a580cdd1327..83eae16fa8f686034a00a263ae443a570ae8e67b 100644 --- a/README.md +++ b/README.md @@ -39,12 +39,24 @@ This project uses [pip-tools](https://github.com/jazzband/pip-tools) to manage d # How to run development server +``` export PYTHONPATH=/home/ftsflask/fts-rest-flask/src:/home/ftsflask/fts-rest-flask/src/fts3rest -export FLASK_APP=/home/ftsflask/fts-rest-flask/src/fts3rest/fts3rest.wsgi +export FLASK_APP=/home/ftsflask/fts-rest-flask/src/fts3rest/fts3restwsgi.py export FLASK_ENV=development flask run curl http://127.0.0.1:5000/hello - +``` +# Connect to local database +``` + +``` +# Run tests +``` +source venv/bin/activate +export PYTHONPATH=/home/ftsflask/fts-rest-flask/src:/home/ftsflask/fts-rest-flask/src/fts3rest +export FTS3TESTCONFIG=/home/ftsflask/fts-rest-flask/src/fts3rest/fts3rest/tests/fts3testconfig +python3 -m pytest -x src/fts3rest/fts3rest/tests/functional/test_job_submission.py +``` # Migration status Starting with the client, as it requires small changes only. Will not migrate pycurlrequest.py, as it is not used anymore. \ No newline at end of file diff --git a/dev-requirements.in b/dev-requirements.in index b411d46bddc56bb6f6bb5870d2a0272745006bf4..efb0a5fc1764216cec5a0564a8c247449429c33a 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -4,5 +4,5 @@ black pylint bandit radon -#pytest +pytest #coverage diff --git a/runtests.sh b/runtests.sh new file mode 100644 index 0000000000000000000000000000000000000000..aa8d32d0ab75a8901f121ba169091a57e3fc16ec --- /dev/null +++ b/runtests.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +source venv/bin/activate +export PYTHONPATH=/home/ftsflask/fts-rest-flask/src:/home/ftsflask/fts-rest-flask/src/fts3rest +export FTS3TESTCONFIG=/home/ftsflask/fts-rest-flask/src/fts3rest/fts3rest/tests/fts3testconfig +python3 -m pytest -x src/fts3rest/fts3rest/tests/functional/test_job_submission.py diff --git a/src/fts3/util/config.py b/src/fts3/util/config.py index a647c4bc63d046041eb8809376ecaa8a303d64e3..fe2262badfd5f5be7d8ad1489c6bd7c25e65264b 100644 --- a/src/fts3/util/config.py +++ b/src/fts3/util/config.py @@ -28,19 +28,9 @@ def fts3_config_load(path="/etc/fts3/fts3config"): log.debug("entered fts3_config_load") fts3cfg = {} - # Dirty workaround: ConfigParser doesn't like files without - # headers, so fake one (since FTS3 config file doesn't have a - # default one) - try: - with open(path) as config_file: - content = "[fts3]\n" + config_file.read() - except IOError as ex: - log.exception("Failed to load configuration file") - raise ex - parser = ConfigParser() parser.optionxform = str - parser.read_string(content) + parser.read_file(path) # Map all options for name, value in parser.items("fts3"): diff --git a/src/fts3rest/fts3rest/config/middleware.py b/src/fts3rest/fts3rest/config/middleware.py index bfacd1bde6372341650f59184dcf9cd1985a0810..4845b4809ca8058894a5879d953a6b0af2244738 100644 --- a/src/fts3rest/fts3rest/config/middleware.py +++ b/src/fts3rest/fts3rest/config/middleware.py @@ -2,7 +2,9 @@ from flask import Flask, jsonify from werkzeug.exceptions import NotFound from sqlalchemy import engine_from_config, event import MySQLdb - +import os +from io import StringIO +import logging.config from fts3rest.config.routing import base from fts3.util.config import fts3_config_load from fts3rest.model import init_model @@ -10,19 +12,41 @@ from fts3rest.lib.helpers.connection_validator import ( connection_validator, connection_set_sqlmode, ) +from fts3rest.lib.middleware.fts3auth.fts3authmiddleware import FTS3AuthMiddleware +from fts3rest.lib.middleware.error_as_json import ErrorAsJson +from fts3rest.lib.middleware.timeout import TimeoutHandler from fts3rest.model.meta import Session +from werkzeug.exceptions import HTTPException +import json -def create_app(default_config_filename): - app = Flask(__name__) +def _load_configuration(config_file): + # ConfigParser doesn't handle files without headers. + # If the configuration file doesn't start with [fts3], + # add it for backwards compatibility, as before migrating to Flask + # the config file didn't have a header. + with open(config_file, "r") as config: + content = None + for line in config: + if not line.isspace() and not line.lstrip().startswith("#"): + config.seek(0) + if line.lstrip().startswith("[fts3]"): + content = StringIO(config.read()) + else: + content = StringIO("[fts3]\n" + config.read()) + break + if not content: + raise IOError("Empty configuration file") # Load configuration - fts3cfg = fts3_config_load(default_config_filename) - app.config.update(fts3cfg) + logging.config.fileConfig(content) + content.seek(0) + fts3cfg = fts3_config_load(content) + content.close() + return fts3cfg - # Add routes - base.do_connect(app) +def _load_db(app): # Setup the SQLAlchemy database engine kwargs = dict() if app.config["sqlalchemy.url"].startswith("mysql://"): @@ -46,10 +70,57 @@ def create_app(default_config_filename): def shutdown_session(exception=None): Session.remove() - @app.errorhandler(NotFound) - def handle_invalid_usage(error): - response = jsonify(error=error.code, name=error.name) - response.status_code = error.code + +def create_app(default_config_file=None, test=False): + """ + Create a new fts-rest Flask app + :param default_config_file: Config file to use if the environment variable + FTS3CONFIG is not set + :param test: True if testing. FTS3TESTCONFIG will be used instead of FTS3CONFIG + :return: the app + """ + app = Flask(__name__) + + if test: + config_file = os.environ.get("FTS3TESTCONFIG", default_config_file) + else: + config_file = os.environ.get("FTS3CONFIG", default_config_file) + + fts3cfg = _load_configuration(config_file) + + # Add configuration + app.config.update(fts3cfg) + + # Add routes + base.do_connect(app) + + # Add DB + _load_db(app) + + # FTS3 authentication/authorization middleware + app.wsgi_app = FTS3AuthMiddleware(app.wsgi_app, fts3cfg) + + # Catch DB Timeout + app.wsgi_app = TimeoutHandler(app.wsgi_app, fts3cfg) + + # Convert errors to JSON + # app.wsgi_app = ErrorAsJson(app.wsgi_app) + @app.errorhandler(HTTPException) + def handle_exception(e): + """Return JSON instead of HTML for HTTP errors.""" + # start with the correct headers and status code from the error + response = e.get_response() + # replace the body with JSON + response.data = json.dumps( + {"status": f"{e.code} {e.name}", "message": e.description,} + ) + response.content_type = "application/json" return response + # @app.errorhandler(NotFound) + # def handle_invalid_usage(error): + # response = jsonify(error=error.code, name=error.name) + # response.status_code = error.code + # return response + return app diff --git a/src/fts3rest/fts3rest/config/routing/base.py b/src/fts3rest/fts3rest/config/routing/base.py index 76df7fbfa92a3f62bcdc99656be75dc239c662b4..70c91384ff1fe90882593f28a472827c4d1ac2d4 100644 --- a/src/fts3rest/fts3rest/config/routing/base.py +++ b/src/fts3rest/fts3rest/config/routing/base.py @@ -101,6 +101,7 @@ def do_connect(app): ) # Jobs + app.add_url_rule("/jobs", "jobs.index", jobs.index, methods=["GET"]) app.add_url_rule("/jobs/", "jobs.index", jobs.index, methods=["GET"]) app.add_url_rule("/jobs/<job_list>", "jobs.get", jobs.get, methods=["GET"]) app.add_url_rule( @@ -140,9 +141,11 @@ def do_connect(app): app.add_url_rule("/jobs", "jobs.submit", jobs.submit, methods=["PUT", "POST"]) # Query directly the transfers + app.add_url_rule("/files", "files.index", files.index, methods=["GET"]) app.add_url_rule("/files/", "files.index", files.index, methods=["GET"]) # Archive + app.add_url_rule("/archive", "archive.index", archive.index, methods=["GET"]) app.add_url_rule("/archive/", "archive.index", archive.index, methods=["GET"]) app.add_url_rule("/archive/<job_id>", "archive.get", archive.get, methods=["GET"]) app.add_url_rule( diff --git a/src/fts3rest/fts3rest/controllers/banning.py b/src/fts3rest/fts3rest/controllers/banning.py index 8081a021e1fefc44faf83be823441598257e83c1..88e12763394a9283ae7e552eff50d2a56cfe2bfa 100644 --- a/src/fts3rest/fts3rest/controllers/banning.py +++ b/src/fts3rest/fts3rest/controllers/banning.py @@ -274,7 +274,7 @@ def ban_se(): """ if request.content_type == "application/json": try: - input_dict = json.loads(request.body) + input_dict = json.loads(request.data) except Exception: raise BadRequest("Malformed input") else: @@ -348,7 +348,7 @@ def ban_dn(): """ if request.content_type == "application/json": try: - input_dict = json.loads(request.body) + input_dict = json.loads(request.data) except Exception: raise BadRequest("Malformed input") else: diff --git a/src/fts3rest/fts3rest/controllers/datamanagement.py b/src/fts3rest/fts3rest/controllers/datamanagement.py index a3aa3605152139656902a59dd963f46980745211..43a2cb5d6514e9d55e8b86c04e38f08eedb79658 100644 --- a/src/fts3rest/fts3rest/controllers/datamanagement.py +++ b/src/fts3rest/fts3rest/controllers/datamanagement.py @@ -247,9 +247,9 @@ def mkdir(): try: if request.method == "POST": if request.content_type == "application/json": - unencoded_body = request.body + unencoded_body = request.data else: - unencoded_body = unquote_plus(request.body) + unencoded_body = unquote_plus(request.data) else: raise BadRequest("Unsupported method %s" % request.method) @@ -280,9 +280,9 @@ def unlink(): try: if request.method == "POST": if request.content_type == "application/json": - unencoded_body = request.body + unencoded_body = request.data else: - unencoded_body = unquote_plus(request.body) + unencoded_body = unquote_plus(request.data) else: raise BadRequest("Unsupported method %s" % request.method) @@ -315,9 +315,9 @@ def rmdir(): try: if request.method == "POST": if request.content_type == "application/json": - unencoded_body = request.body + unencoded_body = request.data else: - unencoded_body = unquote_plus(request.body) + unencoded_body = unquote_plus(request.data) else: raise BadRequest("Unsupported method %s" % request.method) @@ -351,9 +351,9 @@ def rename(): if request.method == "POST": if request.content_type == "application/json": - unencoded_body = request.body + unencoded_body = request.data else: - unencoded_body = unquote_plus(request.body) + unencoded_body = unquote_plus(request.data) else: raise BadRequest("Unsupported method %s" % request.method) diff --git a/src/fts3rest/fts3rest/controllers/delegation.py b/src/fts3rest/fts3rest/controllers/delegation.py index 3af7950c47732f8813f92eeb8175d0390e5c0d67..b9cd160a0ed7e1b09301dce285676e2f30e4580f 100644 --- a/src/fts3rest/fts3rest/controllers/delegation.py +++ b/src/fts3rest/fts3rest/controllers/delegation.py @@ -320,7 +320,7 @@ class credential(Delegation): if credential_cache is None: raise BadRequest("No credential cache found") - x509_proxy_pem = flask.request.body + x509_proxy_pem = flask.request.data log.debug("Received delegated credentials for %s" % dlg_id) log.debug(x509_proxy_pem) @@ -363,7 +363,7 @@ class voms(Delegation): raise Forbidden("The requested ID and the credentials ID do not match") try: - voms_list = json.loads(flask.request.body) + voms_list = json.loads(flask.request.data) log.debug( "VOMS request received for %s: %s" % (dlg_id, ", ".join(voms_list)) ) diff --git a/src/fts3rest/fts3rest/controllers/files.py b/src/fts3rest/fts3rest/controllers/files.py index d985a4595f75a0103ccb7d9285546c06286f0c00..46f7bb6506c85d12d2cfebdaaa4ff236bf4fcbbd 100644 --- a/src/fts3rest/fts3rest/controllers/files.py +++ b/src/fts3rest/fts3rest/controllers/files.py @@ -14,7 +14,7 @@ from werkzeug.exceptions import Forbidden from datetime import datetime, timedelta -from flask import request, jsonify +from flask import request from urllib.parse import urlparse import json import logging @@ -25,15 +25,16 @@ from fts3rest.lib.JobBuilder_utils import get_storage_element from fts3rest.lib.middleware.fts3auth.authorization import authorize from fts3rest.lib.middleware.fts3auth.constants import * from fts3rest.lib.http_exceptions import * - +from fts3rest.lib.helpers.jsonify import jsonify log = logging.getLogger(__name__) """ - Operations on Files - """ +Operations on Files +""" +@jsonify def index(): """ Get a list of active jobs, or those that match the filter requirements @@ -86,4 +87,4 @@ def index(): else: files = files.filter(File.finish_time == None) - return jsonify(files[:filter_limit]) + return files[:filter_limit] diff --git a/src/fts3rest/fts3rest/controllers/jobs.py b/src/fts3rest/fts3rest/controllers/jobs.py index 8ff94842d361d485340d7885bb5da38404513b5f..09e480fa50fdb183eed0084776814a71db36b08e 100644 --- a/src/fts3rest/fts3rest/controllers/jobs.py +++ b/src/fts3rest/fts3rest/controllers/jobs.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flask import request, Response, jsonify +from flask import request, Response from werkzeug.exceptions import Forbidden, BadRequest, NotFound, Conflict from datetime import datetime, timedelta @@ -28,9 +28,10 @@ from fts3.model import Credential, FileRetryLog from fts3rest.model.meta import Session from fts3rest.lib.http_exceptions import * -from fts3rest.lib.middleware.fts3auth.authorization import authorized +from fts3rest.lib.middleware.fts3auth.authorization import authorized, authorize from fts3rest.lib.middleware.fts3auth.constants import TRANSFER, PRIVATE, NONE, VO from fts3rest.lib.helpers.misc import get_input_as_dict +from fts3rest.lib.helpers.jsonify import jsonify from fts3rest.lib.helpers.msgbus import submit_state_change from fts3rest.lib.JobBuilder import JobBuilder @@ -41,6 +42,8 @@ Operations on jobs and transfers """ +@authorize(TRANSFER) +@jsonify def index(): """ Get a list of active jobs, or those that match the filter requirements @@ -74,7 +77,7 @@ def index(): raise BadRequest( "The provided DN and delegation id do not correspond to the same user" ) - if filter_limit is not None and filter_limit < 0 or filter_limit > 500: + if filter_limit is not None and (filter_limit < 0 or filter_limit > 500): raise BadRequest("The limit must be positive and less or equal than 500") # Automatically apply filters depending on granted level @@ -128,7 +131,7 @@ def index(): return Response(_field_subset(), mimetype="application/json") else: - return jsonify(jobs) + return jobs def _get_job(job_id, env=None): @@ -142,6 +145,7 @@ def _get_job(job_id, env=None): return job +@jsonify def get(job_list): """ Get the job with the given ID @@ -197,9 +201,10 @@ def get(job_list): if multistatus: return Response(statuses, status=207, mimetype="application/json") else: - return jsonify(statuses) + return statuses +@jsonify def get_files(job_id): """ Get the files within a job @@ -217,6 +222,7 @@ def get_files(job_id): ) +@jsonify def cancel_files(job_id, file_ids): """ Cancel individual files - comma separated for multiple - within a job @@ -281,19 +287,154 @@ def cancel_files(job_id, file_ids): Session.rollback() raise if len(changed_states) > 1: - return jsonify(changed_states) + return changed_states else: - return jsonify(changed_states[0]) + return changed_states[0] -def cancel_all_by_vo(): - raise NotFound +@jsonify +def cancel_all_by_vo(vo_name): + """ + Cancel all files by the given vo_name + """ + user = request.environ["fts3.User.Credentials"] + + now = datetime.utcnow() + if not user.is_root: + raise Forbidden("User does not have root privileges") + + try: + # FTS3 daemon expects finish_time to be NULL in order to trigger the signal + # to fts_url_copy + file_count = ( + Session.query(File) + .filter(File.vo_name == vo_name) + .filter(File.file_state.in_(FileActiveStates)) + .update( + { + "file_state": "CANCELED", + "reason": "Job canceled by the user", + "dest_surl_uuid": None, + "finish_time": None, + }, + synchronize_session=False, + ) + ) + + # However, for data management operations there is nothing to signal, so + # set job_finished + dm_count = ( + Session.query(DataManagement) + .filter(DataManagement.vo_name == vo_name) + .filter(DataManagement.file_state.in_(DataManagementActiveStates)) + .update( + { + "file_state": "CANCELED", + "reason": "Job canceled by the user", + "job_finished": now, + "finish_time": now, + }, + synchronize_session=False, + ) + ) + + job_count = ( + Session.query(Job) + .filter(Job.vo_name == vo_name) + .filter(Job.job_state.in_(JobActiveStates)) + .update( + { + "job_state": "CANCELED", + "reason": "Job canceled by the user", + "job_finished": now, + }, + synchronize_session=False, + ) + ) + Session.commit() + Session.expire_all() + log.info("Active jobs for VO %s canceled" % vo_name) + except Exception: + Session.rollback() + raise + return { + "affected_files": file_count, + "affected_dm": dm_count, + "affected_jobs": job_count, + } +@jsonify def cancel_all(): - raise NotFound + """ + Cancel all files + """ + user = request.environ["fts3.User.Credentials"] + + now = datetime.utcnow() + if not user.is_root: + raise Forbidden("User does not have root privileges") + + try: + # FTS3 daemon expects finish_time to be NULL in order to trigger the signal + # to fts_url_copy + file_count = ( + Session.query(File) + .filter(File.file_state.in_(FileActiveStates)) + .update( + { + "file_state": "CANCELED", + "reason": "Job canceled by the user", + "dest_surl_uuid": None, + "finish_time": None, + }, + synchronize_session=False, + ) + ) + + # However, for data management operations there is nothing to signal, so + # set job_finished + dm_count = ( + Session.query(DataManagement) + .filter(DataManagement.file_state.in_(DataManagementActiveStates)) + .update( + { + "file_state": "CANCELED", + "reason": "Job canceled by the user", + "job_finished": now, + "finish_time": now, + }, + synchronize_session=False, + ) + ) + + job_count = ( + Session.query(Job) + .filter(Job.job_state.in_(JobActiveStates)) + .update( + { + "job_state": "CANCELED", + "reason": "Job canceled by the user", + "job_finished": now, + }, + synchronize_session=False, + ) + ) + Session.commit() + Session.expire_all() + log.info("Active jobs canceled") + except Exception: + Session.rollback() + raise + return { + "affected_files": file_count, + "affected_dm": dm_count, + "affected_jobs": job_count, + } + +@jsonify def get_file_retries(job_id, file_id): """ Get the retries for a given file @@ -310,6 +451,7 @@ def get_file_retries(job_id, file_id): return Response(retries.all(), mimetype="application/json") +@jsonify def get_dm(job_id): """ Get the data management tasks within a job @@ -325,13 +467,14 @@ def get_dm(job_id): ) +@jsonify def get_field(job_id, field): """ Get a specific field from the job identified by id """ job = _get_job(job_id) if hasattr(job, field): - return jsonify(getattr(job, field)) + return getattr(job, field) else: raise NotFound("No such field") @@ -362,6 +505,7 @@ def _multistatus(responses, expecting_multistatus=False): return responses +@jsonify def cancel(job_id_list): """ Cancel the given job @@ -461,6 +605,7 @@ def cancel(job_id_list): return _multistatus(responses, expecting_multistatus=len(requested_job_ids) > 1) +@jsonify def modify(job_id_list): """ Modify a job, or set of jobs @@ -532,6 +677,8 @@ def modify(job_id_list): return _multistatus(responses, expecting_multistatus=len(requested_job_ids) > 1) +@authorize(TRANSFER) +@jsonify def submit(): """ Submits a new job @@ -540,6 +687,7 @@ def submit(): submission, /api-docs/schema/submit gives the expected format encoded as a JSON-schema. It can be used to validate (i.e in Python, jsonschema.validate) """ + log.debug("submitting job") # First, the request has to be valid JSON submitted_dict = get_input_as_dict(request) @@ -581,7 +729,7 @@ def submit(): except IntegrityError as err: Session.rollback() raise Conflict("The submission is duplicated " + str(err)) - except: + except Exception: Session.rollback() raise diff --git a/src/fts3rest/fts3rest/lib/helpers/jsonify.py b/src/fts3rest/fts3rest/lib/helpers/jsonify.py new file mode 100644 index 0000000000000000000000000000000000000000..6cd9c5964aa9ccb0c4264a7cbbf6de136e679847 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/helpers/jsonify.py @@ -0,0 +1,105 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from fts3.model.base import Base +from sqlalchemy.orm.query import Query +import json +import logging +import types +import functools +from flask import Response + +log = logging.getLogger(__name__) + + +class ClassEncoder(json.JSONEncoder): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.visited = [] + + def default(self, obj): # pylint: disable=E0202 + if isinstance(obj, Base): + self.visited.append(obj) + + if isinstance(obj, datetime): + return obj.strftime("%Y-%m-%dT%H:%M:%S%z") + elif isinstance(obj, set) or isinstance(obj, types.GeneratorType): + return list(obj) + elif isinstance(obj, Base) or hasattr(obj, "__dict__"): + # Trigger sqlalchemy if needed + str(obj) + values = {} + for k, v in obj.__dict__.items(): + if not k.startswith("_") and v not in self.visited: + values[k] = v + if isinstance(v, Base): + self.visited.append(v) + return values + else: + return super().default(obj) + + +def to_json(data, indent=2): + return json.dumps(data, cls=ClassEncoder, indent=indent, sort_keys=False) + + +def stream_response(data): + """ + Serialize an iterable a a json-list using a generator, so we do not need to wait to serialize the full + list before starting to send + """ + log.debug("Yielding json response") + comma = False + yield "[" + for item in data: + if comma: + yield "," + yield json.dumps(item, cls=ClassEncoder, indent=None, sort_keys=False) + comma = True + yield "]" + + +def jsonify(func): + """ + Decorates methods in the controllers, and converts the output to a JSON + serialization + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + data = func(*args, **kwargs) + response = None + if isinstance(data, Response): + response = data + data = response.response + + if ( + hasattr(data, "__iter__") + and not isinstance(data, dict) + and not isinstance(data, str) + ): + data = stream_response(data) + else: + log.debug("Sending directly json response") + data = [json.dumps(data, cls=ClassEncoder, indent=None, sort_keys=False)] + + if response: + response.response = data + else: + response = Response(data, mimetype="application/json") + return response + + return wrapper diff --git a/src/fts3rest/fts3rest/lib/helpers/misc.py b/src/fts3rest/fts3rest/lib/helpers/misc.py index c08867a169e4085a2b3bb4170e1af3d146510f76..b744fa5437ffa0117b57ecb3239490880bec63b4 100644 --- a/src/fts3rest/fts3rest/lib/helpers/misc.py +++ b/src/fts3rest/fts3rest/lib/helpers/misc.py @@ -7,16 +7,24 @@ def get_input_as_dict(request, from_query=False): """ Return a valid dictionary from the request input """ + content_type = request.content_type if from_query: input_dict = request.values - elif request.content_type == "application/json, application/x-www-form-urlencoded": - input_dict = json.loads(unquote_plus(request.body)) - elif request.content_type.startswith("application/json") or request.method == "PUT": + elif ( + content_type + and content_type == "application/json, application/x-www-form-urlencoded" + ): + input_dict = json.loads(unquote_plus(request.data)) + elif ( + content_type and content_type.startswith("application/json") + ) or request.method == "PUT": try: - input_dict = json.loads(request.body) + input_dict = json.loads(request.data) except Exception: raise BadRequest("Badly formatted JSON request") - elif request.content_type.startswith("application/x-www-form-urlencoded"): + elif content_type and request.content_type.startswith( + "application/x-www-form-urlencoded" + ): input_dict = dict(request.values) else: raise BadRequest( diff --git a/src/fts3rest/fts3rest/lib/http_exceptions.py b/src/fts3rest/fts3rest/lib/http_exceptions.py index 55abe38258fa9a007a7124c10addf4fa85b2c066..5f68aafe8c099cdacd19a834e774586426debd4c 100644 --- a/src/fts3rest/fts3rest/lib/http_exceptions.py +++ b/src/fts3rest/fts3rest/lib/http_exceptions.py @@ -22,4 +22,5 @@ from werkzeug.exceptions import HTTPException class HTTPAuthenticationTimeout(HTTPException): code = 419 + name = "Authentication Timeout" description = "The authentication has expired" diff --git a/src/fts3rest/fts3rest/lib/middleware/error_as_json.py b/src/fts3rest/fts3rest/lib/middleware/error_as_json.py new file mode 100644 index 0000000000000000000000000000000000000000..80fe24dd3b715095bf158953609327410d12fc46 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/error_as_json.py @@ -0,0 +1,55 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + + +class ErrorAsJson: + """ + This middleware encodes an error as a json message if json was + requested in the headers. Otherwise, let the error go and someone else catch it + """ + + def __init__(self, wrap_app): + self.app = wrap_app + + def __call__(self, environ, start_response): + accept = environ.get("HTTP_ACCEPT", "application/json") + is_json_accepted = "application/json" in accept + + self._status_msg = None + self._status_code = None + + def override_start_response(status, headers, exc_info=None): + self._status_code = int(status.split()[0]) + if self._status_code >= 400 and is_json_accepted: + headers = [ + h + for h in headers + if h[0].lower() not in ("content-type", "content-length") + ] + headers.append(("Content-Type", "application/json")) + self._status_msg = status + return start_response(status, headers, exc_info) + + response = self.app(environ, override_start_response) + + if self._status_code >= 400 and is_json_accepted: + # todo the problem is this contains html + # check src/fts3rest/fts3rest/lib/JobBuilder.py: raise BadRequest("Invalid value within the request: %s" % str(ex)) + err_msg = "".join(response.decode()) + json_error = {"status": self._status_msg, "message": err_msg} + response = [json.dumps(json_error)] + return response diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py index b0805d99c83c64325f0315d95e8bee1f1d531c1f..a40e8b3cc51574e89b2ba5fef545cb2ed39acea3 100644 --- a/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py @@ -56,10 +56,10 @@ def generate_delegation_id(dn, fqans): The associated delegation id """ d = hashlib.sha1() - d.update(dn) + d.update(dn.encode("utf-8")) for fqan in fqans: - d.update(fqan) + d.update(fqan.encode("utf-8")) # Original implementation only takes into account first 16 characters return d.hexdigest()[:16] diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/fts3authmiddleware.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/fts3authmiddleware.py new file mode 100644 index 0000000000000000000000000000000000000000..c4f42bc639501dc00fcb6ac052433bbd0074bdee --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/fts3authmiddleware.py @@ -0,0 +1,109 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from fts3rest.model.meta import Session +from fts3.model import BannedDN +from .credentials import UserCredentials, InvalidCredentials +from sqlalchemy.exc import DatabaseError +from urllib.parse import urlparse +from werkzeug.exceptions import Unauthorized, Forbidden, HTTPException + + +log = logging.getLogger(__name__) + + +class FTS3AuthMiddleware: + """ + Pylons middleware to wrap the authentication as part of the request + process. + """ + + def __init__(self, wrap_app, config): + self.app = wrap_app + self.config = config + + def _trusted_origin(self, environ, parsed): + allow_origin = environ.get("ACCESS_CONTROL_ORIGIN", None) + if not allow_origin: + return False + return parsed.scheme + "://" + parsed.netloc == allow_origin + + def _validate_origin(self, environ): + origin = environ.get("HTTP_ORIGIN", None) + if not origin: + log.debug("No Origin header found") + return + parsed = urlparse(origin) + if parsed.netloc != environ.get("HTTP_HOST"): + if not self._trusted_origin(environ, parsed): + raise Forbidden("Host and Origin do not match") + log.info("Trusted Origin: %s://%s" % (parsed.scheme, parsed.netloc)) + + def _get_credentials(self, environ): + try: + credentials = UserCredentials( + environ, self.config["fts3.Roles"], self.config + ) + except InvalidCredentials as e: + raise Forbidden("Invalid credentials (%s)" % str(e)) + + if not credentials.user_dn: + raise Unauthorized("A valid X509 certificate or proxy is needed") + + if not self._has_authorized_vo(credentials): + raise Forbidden("The user does not belong to any authorized vo") + + if self._is_banned(credentials): + raise Forbidden("The user has been banned") + + return credentials + + def __call__(self, environ, start_response): + try: + self._validate_origin(environ) + credentials = self._get_credentials(environ) + environ["fts3.User.Credentials"] = credentials + log.info("%s logged in via %s" % (credentials.user_dn, credentials.method)) + except HTTPException as e: + log.exception(e) + return e(environ, start_response) + except DatabaseError as e: + log.error( + "Database error when trying to get user's credentials: %s" % str(e) + ) + Session.remove() + raise + except Exception as e: + log.error( + "Unexpected error when trying to get user's credentials: %s" % str(e) + ) + raise + else: + return self.app(environ, start_response) + + def _has_authorized_vo(self, credentials): + if "*" in self.config["fts3.AuthorizedVO"]: + return True + for v in credentials.vos: + if v in self.config["fts3.AuthorizedVO"]: + log.info("Authorized VO: %s" % str(v)) + return True + return False + + def _is_banned(self, credentials): + banned = Session.query(BannedDN).get(credentials.user_dn) + return banned is not None diff --git a/src/fts3rest/fts3rest/lib/middleware/timeout.py b/src/fts3rest/fts3rest/lib/middleware/timeout.py new file mode 100644 index 0000000000000000000000000000000000000000..4be574ee3e92e03679b4f61c2203f1bc340691f8 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/timeout.py @@ -0,0 +1,47 @@ +# Copyright 2016-2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy.exc import TimeoutError +from werkzeug.exceptions import ServiceUnavailable + + +def _asbool(obj): + if isinstance(obj, str): + obj = obj.strip().lower() + if obj in ["true", "yes", "on", "y", "t", "1"]: + return True + elif obj in ["false", "no", "off", "n", "f", "0"]: + return False + else: + raise ValueError("String is not true/false: %r" % obj) + return bool(obj) + + +class TimeoutHandler: + """ + Catch Timeout and similar errors, and return an HTTPServiceUnavailable instead + """ + + def __init__(self, wrap_app, config): + self.app = wrap_app + self.config = config + + def __call__(self, environ, start_response): + try: + return self.app(environ, start_response) + except TimeoutError: + if _asbool(self.config.get("debug")): + raise + else: + return ServiceUnavailable()(environ, start_response) diff --git a/src/fts3rest/fts3rest/lib/oauth2lib/utils.py b/src/fts3rest/fts3rest/lib/oauth2lib/utils.py index 44a6af7512565490c8cf669de40696834abeb8a9..21da6ce87808be16e26bec037bb5dbe35b2be501 100644 --- a/src/fts3rest/fts3rest/lib/oauth2lib/utils.py +++ b/src/fts3rest/fts3rest/lib/oauth2lib/utils.py @@ -2,9 +2,7 @@ import string from urllib.parse import urlparse, urlencode, urlunparse, parse_qsl from random import SystemRandom -UNICODE_ASCII_CHARACTERS = string.ascii_letters.decode("ascii") + string.digits.decode( - "ascii" -) +UNICODE_ASCII_CHARACTERS = string.ascii_letters + string.digits def random_ascii_string(length): diff --git a/src/fts3rest/fts3rest/lib/oauth2provider.py b/src/fts3rest/fts3rest/lib/oauth2provider.py index cd554ea0e89906989bd65d58c41e5a51f49e4ebf..f22b0784faa86751ef23d2be95241d257e838a97 100644 --- a/src/fts3rest/fts3rest/lib/oauth2provider.py +++ b/src/fts3rest/fts3rest/lib/oauth2provider.py @@ -317,7 +317,12 @@ class FTS3OAuth2ResourceProvider(ResourceProvider): ) else: credential = jwt.decode( - access_token, pub_key.export_to_pem(), algorithms=[algorithm] + access_token, + pub_key.export_to_pem(), + algorithms=[algorithm], + options={ + "verify_aud": False + }, # We don't check audience for non-WLCG token ) log.debug("offline_response::: {}".format(credential)) except Exception as ex: diff --git a/src/fts3rest/fts3rest/tests/__init__.py b/src/fts3rest/fts3rest/tests/__init__.py index 558f1fb62b50f42c4dcfa211dceb4b4e726389f4..3917dbe73b1c8ebe98acb7d8a7770b1616dcc90b 100644 --- a/src/fts3rest/fts3rest/tests/__init__.py +++ b/src/fts3rest/fts3rest/tests/__init__.py @@ -13,7 +13,7 @@ from fts3rest.model.meta import Session from fts3.model import Credential, CredentialCache, DataManagement from fts3.model import Job, File, FileRetryLog, ServerConfig from fts3rest.config.middleware import create_app -from .ftstestclient import FTSTestClient +from .ftstestclient import FTSTestClient, TestResponse def _generate_mock_cert(): @@ -43,10 +43,10 @@ class TestController(TestCase): def setUp(self): self.pkey, self.cert = _generate_mock_cert() - test_config_default = "./fts3testconfig" - self.flask_app = create_app(test_config_default) + self.flask_app = create_app(test=True) self.flask_app.testing = True self.flask_app.test_client_class = FTSTestClient + self.flask_app.response_class = TestResponse self.app = self.flask_app.test_client() def setup_gridsite_environment(self, no_vo=False, dn=None): @@ -194,3 +194,5 @@ class TestController(TestCase): shutil.rmtree(self.flask_app.config["fts3.MessagingDirectory"]) except Exception: pass + + self.flask_app.do_teardown_appcontext() diff --git a/src/fts3rest/fts3rest/tests/fts3testconfig b/src/fts3rest/fts3rest/tests/fts3testconfig new file mode 100644 index 0000000000000000000000000000000000000000..36f14eea43480a4bc12741fea1ffa264934fb433 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/fts3testconfig @@ -0,0 +1,219 @@ +# Running user and group +User=ftsflask +Group=ftsflask + +# mysql only +DbType=mysql + +#db username +DbUserName=ftsflask + +#db password +DbPassword= + + +#For MySQL, it has to follow the format 'host/db' (i.e. "mysql-server.example.com/fts3db") +DbConnectString=localhost:3306/ftsflask + +#Number of db connections in the pool (use even number, e.g. 2,4,6,8,etc OR 1 for a single connection) +DbThreadsNum=30 + +#The alias used for the FTS endpoint, will be published as such in the dashboard transfers UI http://dashb-wlcg-transfers.cern.ch/ui/ +#Alias=fts3-xdc.cern.ch + +#Infosys, either the fqdn:port of a BDII instance or false to disable BDII access +#Infosys=lcg-bdii.cern.ch:2170 + +#Query the info systems specified in the order given, e.g. glue1;glue2 +InfoProviders=glue1 + +#List of authorized VOs, separated by ; +#Leave * to authorize any VO +AuthorizedVO=* + +# site name +#SiteName=FTS-DEV-XDC + +#Enable/Disable monitoring using messaging monitoring (disabled=false / enabled=true) +MonitoringMessaging=false + +# Profiling interval in seconds. If set to 0, it will be disabled +Profiling=0 + +# Log directories +TransferLogDirectory=/var/log/fts3/transfers +ServerLogDirectory=/var/log/fts3 + +# Log level. Enables logging for messages of level >= than configured +# Possible values are +# TRACE (every detail), DEBUG (internal behaviour), INFO (normal behaviour), +# NOTICE (final states), WARNING (things worth checking), ERR (internal FTS3 errors, as database connectivity), +# CRIT (fatal errors, as segmentation fault) +# It is recommended to use DEBUG or INFO +LogLevel=DEBUG + +# Check for fts_url_copy processes that do not give their progress back +# CheckStalledTransfers = true +# Stalled timeout, in seconds +# CheckStalledTimeout = 900 +CheckStalledTimeout = 900 + +# Minimum required free RAM (in MB) for FTS3 to work normally +# If the amount of free RAM goes below the limit, FTS3 will enter auto-drain mode +# This is intended to protect against system resource exhaustion +# MinRequiredFreeRAM = 50 +MinRequiredFreeRAM = 50 + +# Maximum number of url copy processes that the node can run +# The RAM limitation may not take into account other node limitations (i.e. IO) +# or, depending on the swapping policy, may not even prevent overloads if the kernel +# starts swapping before the free RAM decreases until it reaches the value of MinRequiredFreeRAM +# 0 disables the check. +# The default is 400. +# MaxUrlCopyProcesses = 400 +MaxUrlCopyProcesses = 400 + +# Parameters for Bring Online +# Maximum bulk size. +# If the size is too large, it will take more resources (memory and CPU) to generate the requests and +# parse the responses. Some servers may reject the requests if they are too big. +# If it is too small, performance will be reduced. +# Keep it to a sensible size (between 100 and 1k) +# StagingBulkSize=400 +# Maximum number of concurrent requests. This gives a maximum of files sent to the storage system +# (StagingBulkSize*StagingConcurrentRequests). The larger the number, the more requests will FTS need to keep track of. +# StagingConcurrentRequests=500 +# Seconds to wait before submitting a bulk request, so FTS can accumulate more files per bulk. +# Note that the resolution is 60 seconds. +# StagingWaitingFactor=300 +# Retry this number of times if a staging poll fails with ECOMM +# StagingPollRetries=3 + +# In seconds, interval between heartbeats +# HeartBeatInterval=60 +# I seconds, after this interval a host is considered down +# HeartBeatGraceInterval=120 + +# Seconds between optimizer runs +# OptimizerInterval = 60 +# After this time without optimizer updates, force a run +# OptimizerSteadyInterval = 300 +# Maximum number of streams per file +# OptimizerMaxStreams = 16 + +# EMA Alpha factor to reduce the influence of fluctuations +# OptimizerEMAAlpha = 0.1 +# Increase step size when the optimizer considers the performance is good +# OptimizerIncreaseStep = 1 +# Increase step size when the optimizer considers the performance is good, and set to aggressive or normal +# OptimizerAggressiveIncreaseStep = 2 +# Decrease step size when the optimizer considers the performance is bad +# OptimizerDecreaseStep = 1 + + +# Set the bulk size, in number of jobs, used for cleaning the old records +#CleanBulkSize=5000 +# In days. Entries older than this will be purged. +#CleanInterval=7 + +## The higher the values for the following parameters, +## the higher the latency for some operations (as cancelations), +## but can also reduce the system and/or database load + +# In seconds, how often to purge the messaging directory +#PurgeMessagingDirectoryInterval = 600 +# In seconds, how often to run sanity checks +#CheckSanityStateInterval = 3600 +# In seconds, how often to check for canceled transfers +#CancelCheckInterval = 10 +# In seconds, how often to check for expired queued transfers +#QueueTimeoutCheckInterval = 300 +# In seconds, how often to check for stalled transfers +#ActiveTimeoutCheckInterval = 300 +# In seconds, how often to schedule new transfers +#SchedulingInterval = 2 +# In seconds, how often to check for messages. Should be less than CheckStalledTimeout/2 +#MessagingConsumeInterval = 1 +#Enable or disable auto session reuse +AutoSessionReuse = true +#Max small file size for session reuse in bytes +AutoSessionReuseMaxSmallFileSize = 104857600 +#Max big file size for session reuse in bytes +AutoSessionReuseMaxBigFileSize = 1073741824 +#Max number of files per session reuse +AutoSessionReuseMaxFiles = 1000 +#Max number of big files per session reuse +AutoSessionReuseMaxBigFiles = 2 +BackupTables=false +OptimizerMaxSuccessRate=100 +OptimizerMedSuccessRate=80 +OptimizerLowSuccessRate=75 +OptimizerBaseSuccessRate=74 +Port=8443 +UseFixedJobPriority=0 + +ValidateAccessTokenOffline=True +JWKCacheSeconds=86400 +TokenRefreshDaemonIntervalInSeconds=600 +# Logging configuration +[loggers] +keys = root, routes, fts3rest, sqlalchemy + +[handlers] +keys = console, log_file + +[formatters] +keys = generic + +[logger_root] +level = INFO +handlers = log_file + +[logger_routes] +level = INFO +handlers = +qualname = routes.middleware +# "level = DEBUG" logs the route matched and routing variables. + +[logger_fts3rest] +level = INFO +handlers = +qualname = fts3rest + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine +# "level = INFO" logs SQL queries. +# "level = DEBUG" logs SQL queries and results. +# "level = WARN" logs neither. (Recommended for production systems.) + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[handler_log_file] +# See +# http://docs.python.org/2/library/logging.handlers.html +class = logging.FileHandler +args = ('/var/log/fts3rest/fts3rest.log', 'a') +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(module)s] %(message)s +datefmt = %H:%M:%S + + + +[roles] +Public = all:transfer +lcgadmin = vo:transfer;vo:config +production = all:config + +[providers] +xdc=https://iam.extreme-datacloud.eu +xdc_ClientId= +xdc_ClientSecret= diff --git a/src/fts3rest/fts3rest/tests/ftstestclient.py b/src/fts3rest/fts3rest/tests/ftstestclient.py index 8420041733b52c17422d40830ae585fe3d6f08aa..07be719c91282347a8843628ba21e1041fd874fd 100644 --- a/src/fts3rest/fts3rest/tests/ftstestclient.py +++ b/src/fts3rest/fts3rest/tests/ftstestclient.py @@ -11,7 +11,7 @@ class TestResponse(Response): def _adapt_test(func): - @functools.wraps + @functools.wraps(func) def wrapper(*args, **kwargs): path = kwargs.pop("url", "/") expected_status = kwargs.pop("status", 200) @@ -29,11 +29,16 @@ class FTSTestClient(FlaskClient): with old functional tests created for Pylon's WebTest """ - get = _adapt_test(super().get) - post = _adapt_test(super().post) - put = _adapt_test(super().put) - delete = _adapt_test(super().delete) + get = _adapt_test(FlaskClient.get) + post = _adapt_test(FlaskClient.post) + put = _adapt_test(FlaskClient.put) + delete = _adapt_test(FlaskClient.delete) - def __init__(self, *args, **kwargs): - kwargs["response_wrapper"] = TestResponse - super().__init__(*args, **kwargs) + def post_json(self, url, params, **kwargs): + params = json.dumps(params) + kwargs["content_type"] = "application/json" + return self.post(url=url, params=params, **kwargs) + + def get_json(self, url, *args, **kwargs): + kwargs["headers"] = [("Accept", "application/json")] + return self.get(url=url, *args, **kwargs) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_cancel.py b/src/fts3rest/fts3rest/tests/functional/test_job_cancel.py new file mode 100644 index 0000000000000000000000000000000000000000..5ab52a1319cf606b00f1b7a290bbc01e80b45f7f --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_cancel.py @@ -0,0 +1,510 @@ +import json + +from fts3rest.tests import TestController +from fts3rest.model.meta import Session +from fts3.model import ( + Job, + File, + JobActiveStates, + Credential, + FileActiveStates, + FileTerminalStates, +) +from datetime import datetime, timedelta +import random + + +class TestJobCancel(TestController): + """ + Tests for the job cancellation + """ + + def tearDown(self): + super().tearDown() + cert = "SSL_SERVER_S_DN" + if cert in self.app.environ_base: + del self.app.environ_base["SSL_SERVER_S_DN"] + + def _submit(self, count=1, reuse=False): + """ + Submit a valid job + """ + self.setup_gridsite_environment() + self.push_delegation() + + files = [] + for i in range(count): + u = random.randint(500, 50000) + files.append( + { + "sources": ["root://source.es/file%d" % i], + "destinations": ["root://destcancel.ch/file" + str(i) + str(u)], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ) + + job = { + "files": files, + "params": {"overwrite": True, "verify_checksum": True, "reuse": reuse}, + } + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + return str(job_id) + + def _submit_none_reuse(self, count=1, big_files=0): + """ + Submit a valid job without specifying reuse + """ + self.setup_gridsite_environment() + self.push_delegation() + + files = [] + for i in range(count): + u = random.randint(500001, 1000000) + files.append( + { + "sources": ["root://source.es/file%d" % i], + "destinations": ["root://dest.ch/file" + str(i) + str(u)], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ) + for j in range(big_files): + u = random.randint(100, 1000) + files.append( + { + "sources": ["root://source.es/file%d" % i], + "destinations": ["root://dest.ch/file%d%d" % (i, u)], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 104857600, + "metadata": {"mykey": "myvalue"}, + } + ) + + job = {"files": files, "params": {"overwrite": True, "verify_checksum": True}} + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + return str(job_id) + + def test_cancel(self): + """ + Cancel a job + """ + job_id = self._submit() + job = self.app.delete(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + + # Is it in the database? + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + self.assertEqual(job.job_type, "N") + + self.assertNotEqual(None, job.job_finished) + for f in job.files: + self.assertEqual(f.file_state, "CANCELED") + self.assertNotEqual(None, f.finish_time) + + def test_cancel_running(self): + """ + Cancel a job, but the transfer is running (pid is set) + """ + job_id = self._submit() + + # Add pid + transfer = Session.query(File).filter(File.job_id == job_id).first() + transfer.pid = 1234 + Session.merge(transfer) + Session.commit() + + job = self.app.delete(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + + # Is it in the database? + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + self.assertNotEqual(None, job.job_finished) + for f in job.files: + self.assertEqual(f.file_state, "CANCELED") + self.assertEqual(None, f.finish_time) + + def test_cancel_terminal(self): + """ + Cancel a job with files in terminal state + """ + job_id = self._submit() + + job = Session.query(Job).get(job_id) + job.job_state = "FINISHED" + for f in job.files: + f.file_state = "FINISHED" + Session.merge(job) + Session.commit() + + job = self.app.delete(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "FINISHED") + self.assertNotEqual(job["reason"], "Job canceled by the user") + + # Is it in the database? + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "FINISHED") + for f in job.files: + self.assertEqual(f.file_state, "FINISHED") + + def test_cancel_some_terminal(self): + """ + Cancel a job with some files in terminal state + """ + job_id = self._submit(10) + + job = Session.query(Job).get(job_id) + job.job_state = "ACTIVE" + for f in job.files: + if f.file_id % 2 == 0: + f.file_state = "FINISHED" + Session.merge(job) + Session.commit() + + job = self.app.delete(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + + # Is it in the database? + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + for f in job.files: + if f.file_id % 2 == 0: + self.assertEqual(f.file_state, "FINISHED") + self.assertNotEqual(f.reason, "Job canceled by the user") + else: + self.assertEqual(f.file_state, "CANCELED") + + def test_cancel_multiple(self): + """ + Cancel multiple jobs at once + """ + job_ids = list() + for i in range(10): + job_ids.append(self._submit()) + + jobs = self.app.delete(url="/jobs/%s" % ",".join(job_ids), status=200).json + + self.assertEqual(len(jobs), 10) + for job in jobs: + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + + for job_id in job_ids: + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + self.assertEqual(job.reason, "Job canceled by the user") + for f in job.files: + self.assertEqual(f.file_state, "CANCELED") + self.assertEqual(f.reason, "Job canceled by the user") + + def test_cancel_multiple_one(self): + """ + Use multiple cancellation convention but with only one job + """ + job_id = self._submit() + + jobs = self.app.delete(url="/jobs/%s," % job_id, status=200).json + + self.assertEqual(len(jobs), 1) + self.assertEqual(jobs[0]["job_id"], job_id) + self.assertEqual(jobs[0]["job_state"], "CANCELED") + self.assertEqual(jobs[0]["reason"], "Job canceled by the user") + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + self.assertEqual(job.reason, "Job canceled by the user") + + def test_cancel_multiple_one_wrong(self): + """ + Cancel multiple jobs, but one does not exist. + One status per entry + """ + job_id = self._submit() + jobs = self.app.delete(url="/jobs/%s,fake-fake-fake" % job_id, status=207).json + + self.assertEqual(len(jobs), 2) + + for job in jobs: + if job["job_id"] == job_id: + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + self.assertEqual(job["http_status"], "200 Ok") + else: + self.assertEqual(job["http_status"], "404 Not Found") + + def _test_cancel_file_asserts(self, job_id, expect_job, expect_files): + """ + Helper for test_cancel_remaining_file + """ + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, expect_job) + if expect_job in JobActiveStates: + self.assertIsNone(job.job_finished) + else: + self.assertIsNotNone(job.job_finished) + self.assertEqual("CANCELED", job.files[0].file_state) + self.assertIsNotNone(job.files[0].finish_time) + for f in job.files[1:]: + self.assertEqual(expect_files, f.file_state) + + def test_cancel_file(self): + """ + Cancel just one file of a job with multiple files. + The job and other files must remain unaffected. + """ + job_id = self._submit(5) + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + self.app.delete(url="/jobs/%s/files/%s" % (job_id, files[0]["file_id"])) + self._test_cancel_file_asserts(job_id, "SUBMITTED", "SUBMITTED") + + def test_cancel_only_file(self): + """ + Cancel the only file in a job. + The job must go to CANCELED. + """ + job_id = self._submit(1) + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + self.app.delete(url="/jobs/%s/files/%s" % (job_id, files[0]["file_id"])) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + self.assertEqual("CANCELED", job.files[0].file_state) + + def _submit_and_mark_all_but_one(self, count, states): + """ + Helper for test_cancel_remaining_file + Submit a job, mark all files except the first one with the state 'state' + state can be a list with count-1 final states + """ + job_id = self._submit(count) + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + if isinstance(states, str): + states = [states] * (count - 1) + + for i in range(1, count): + fil = Session.query(File).get(files[i]["file_id"]) + fil.file_state = states[i - 1] + Session.merge(fil) + Session.commit() + + return job_id, files + + def test_cancel_remaining_file(self): + """ + Cancel the remaining file of a job. + Depending on the other file states, the job must go to FAILED, CANCELED or FINISHEDDIRTY + """ + # Try first all remaining FAILED + # Final state must be FAILED + job_id, files = self._submit_and_mark_all_but_one(5, "FAILED") + + self.app.delete(url="/jobs/%s/files/%s" % (job_id, files[0]["file_id"])) + self._test_cancel_file_asserts(job_id, "CANCELED", "FAILED") + + # All remaining FINISHED + # Final state must be FINISHED + job_id, files = self._submit_and_mark_all_but_one(5, "FINISHED") + + self.app.delete(url="/jobs/%s/files/%s" % (job_id, files[0]["file_id"])) + self._test_cancel_file_asserts(job_id, "CANCELED", "FINISHED") + + # All remaining CANCELED + # Final state must be CANCELED + job_id, files = self._submit_and_mark_all_but_one(5, "CANCELED") + + self.app.delete(url="/jobs/%s/files/%s" % (job_id, files[0]["file_id"])) + self._test_cancel_file_asserts(job_id, "CANCELED", "CANCELED") + + def test_cancel_multiple_files(self): + """ + Cancel multiple files within a job. + """ + job_id = self._submit(10) + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + file_ids = ",".join(map(lambda f: str(f["file_id"]), files[0:2])) + answer = self.app.delete( + url="/jobs/%s/files/%s" % (job_id, file_ids), status=200 + ) + changed_states = answer.json + + self.assertEqual(changed_states, ["CANCELED", "CANCELED"]) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "SUBMITTED") + for file in job.files[2:]: + self.assertEqual(file.file_state, "SUBMITTED") + + def test_cancel_reuse(self): + """ + Jobs with reuse or multihop can not be cancelled file per file + """ + job_id = self._submit(10, reuse=True) + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + file_ids = ",".join(map(lambda f: str(f["file_id"]), files[0:2])) + self.app.delete(url="/jobs/%s/files/%s" % (job_id, file_ids), status=400) + + def test_cancel_reuse_small_files_and_big_files(self): + """ + Cancel a job with small files and one big file is reused + """ + job_id = self._submit_none_reuse(100, 1) + job = self.app.delete(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "CANCELED") + self.assertEqual(job["reason"], "Job canceled by the user") + + # Is it in the database? + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_state, "CANCELED") + + auto_session_reuse = self.flask_app.config.get("fts3.AutoSessionReuse", "false") + if auto_session_reuse == "true": + self.assertEqual(job.job_type, "Y") + else: + self.assertEqual(job.job_type, "N") + + self.assertNotEqual(None, job.job_finished) + for f in job.files: + self.assertEqual(f.file_state, "CANCELED") + self.assertNotEqual(None, f.finish_time) + + def _become_root(self): + """ + Helper function to become root superuser + """ + self.app.environ_base.update( + { + "GRST_CRED_AURI_0": "dn:/C=CH/O=CERN/OU=hosts/OU=cern.ch/CN=ftsdummyhost.cern.ch" + } + ) + self.app.environ_base.update( + { + "SSL_SERVER_S_DN": "/C=CH/O=CERN/OU=hosts/OU=cern.ch/CN=ftsdummyhost.cern.ch" + } + ) + + creds = self.get_user_credentials() + delegated = Credential() + delegated.dlg_id = creds.delegation_id + delegated.dn = "/C=CH/O=CERN/OU=hosts/OU=cern.ch/CN=ftsdummyhost.cern.ch" + delegated.proxy = "-NOT USED-" + delegated.voms_attrs = None + delegated.termination_time = datetime.utcnow() + timedelta(hours=7) + + Session.merge(delegated) + Session.commit() + + def _prepare_and_test_created_jobs_to_cancel(self, files_per_job=8): + """ + Helper function to prepare and test created jobs for cancel tests + """ + job_ids = list() + for i in range(len(FileActiveStates) + len(FileTerminalStates)): + job_ids.append(self._submit(files_per_job)) + i = 0 + for state in FileActiveStates + FileTerminalStates: + job = Session.query(Job).get(job_ids[i]) + i += 1 + if state == "STARTED": + job.job_state = "STAGING" + else: + job.job_state = state + for f in job.files: + f.file_state = state + Session.merge(job) + Session.commit() + + i = 0 + for state in FileActiveStates + FileTerminalStates: + job = Session.query(Job).get(job_ids[i]) + state_job = state + if state == "STARTED": + state_job = "STAGING" + self.assertEqual(job.job_state, state_job) + for f in job.files: + self.assertEqual(f.file_state, state) + i += 1 + return job_ids + + def _test_canceled_jobs(self, job_ids): + """ + Helper function to test canceled jobs + """ + i = 0 + for _ in FileActiveStates: + job = Session.query(Job).get(job_ids[i]) + self.assertEqual(job.job_state, "CANCELED") + for f in job.files: + self.assertEqual(f.file_state, "CANCELED") + i += 1 + for state in FileTerminalStates: + job = Session.query(Job).get(job_ids[i]) + self.assertEqual(job.job_state, state) + for f in job.files: + self.assertEqual(f.file_state, state) + i += 1 + + def test_cancel_all_by_vo(self): + """ + Cancel all files by vo name. + """ + self.setup_gridsite_environment() + creds = self.get_user_credentials() + if creds.vos: + vo_name = creds.vos[0] + else: + vo_name = "testvo" + + job_ids = self._prepare_and_test_created_jobs_to_cancel(files_per_job=8) + self.app.delete(url="/jobs/vo/%s" % vo_name, status=403) + self._become_root() + response = self.app.delete(url="/jobs/vo/%s" % vo_name, status=200).json + self._test_canceled_jobs(job_ids) + self.assertEqual(response["affected_files"], len(FileActiveStates) * 8) + self.assertEqual(response["affected_dm"], 0) + self.assertEqual(response["affected_jobs"], len(FileActiveStates)) + + def test_cancel_all(self): + """ + Cancel all files. + """ + job_ids = self._prepare_and_test_created_jobs_to_cancel(files_per_job=8) + self.app.delete(url="/jobs/all", status=403) + self._become_root() + response = self.app.delete(url="/jobs/all", status=200).json + self._test_canceled_jobs(job_ids) + self.assertEqual(response["affected_files"], len(FileActiveStates) * 8) + self.assertEqual(response["affected_dm"], 0) + self.assertEqual(response["affected_jobs"], len(FileActiveStates)) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_deletion.py b/src/fts3rest/fts3rest/tests/functional/test_job_deletion.py new file mode 100644 index 0000000000000000000000000000000000000000..6354216b69c5540d6249bce99121fd3b2a7b5049 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_deletion.py @@ -0,0 +1,148 @@ +import json + +from fts3rest.tests import TestController +from fts3rest.model.meta import Session +from fts3.model import Job, DataManagement + + +class TestJobDeletion(TestController): + """ + Test DELETE jobs + """ + + def test_simple_delete(self): + """ + Simple deletion job + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "delete": [ + "root://source.es/file", + {"surl": "root://source.es/file2", "metadata": {"a": "b"}}, + ] + } + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + self.assertIsNotNone(job_id) + + job = Session.query(Job).get(job_id) + + self.assertEqual(job.vo_name, "testvo") + self.assertEqual(job.user_dn, self.TEST_USER_DN) + self.assertEqual(job.source_se, "root://source.es") + self.assertEqual("DELETE", job.job_state) + self.assertIsNotNone(job.cred_id) + + dm = Session.query(DataManagement).filter(DataManagement.job_id == job_id).all() + self.assertEqual(2, len(dm)) + + self.assertEqual(dm[0].source_surl, "root://source.es/file") + self.assertEqual(dm[1].source_surl, "root://source.es/file2") + + self.assertEqual(dm[1].file_metadata["a"], "b") + + self.assertEqual(dm[0].hashed_id, dm[1].hashed_id) + + for d in dm: + self.assertEqual(d.vo_name, "testvo") + self.assertEqual(d.file_state, "DELETE") + self.assertEqual(d.source_se, "root://source.es") + + return str(job_id) + + def test_get_delete_job(self): + """ + Submit a deletion job, get info via REST + """ + job_id = self.test_simple_delete() + + job = self.app.get_json(url="/jobs/%s" % job_id, status=200).json + files = self.app.get_json(url="/jobs/%s/dm" % job_id, status=200).json + + self.assertEqual(job["job_state"], "DELETE") + self.assertEqual(files[0]["source_surl"], "root://source.es/file") + self.assertEqual(files[1]["source_surl"], "root://source.es/file2") + + def test_cancel_delete(self): + """ + Submit deletion job, then cancel + """ + job_id = self.test_simple_delete() + + self.app.delete(url="/jobs/%s" % job_id, status=200) + + job = Session.query(Job).get(job_id) + + self.assertEqual("CANCELED", job.job_state) + self.assertEqual(job.reason, "Job canceled by the user") + self.assertIsNotNone(job.job_finished) + + dm = Session.query(DataManagement).filter(DataManagement.job_id == job_id).all() + for d in dm: + self.assertEqual("CANCELED", d.file_state) + self.assertIsNotNone(d.finish_time) + self.assertIsNotNone(d.job_finished) + + def test_delete_repeated(self): + """ + Submit a deletion job with files repeated multiple times, + they must land only once in the db + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "delete": [ + "root://source.es/file", + {"surl": "root://source.es/file2", "metadata": {"a": "b"}}, + "root://source.es/file", + "root://source.es/file2", + "root://source.es/file3", + ] + } + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + self.assertIsNotNone(job_id) + + dm = Session.query(DataManagement).filter(DataManagement.job_id == job_id).all() + + self.assertEqual(3, len(dm)) + registered = set() + for f in dm: + registered.add(f.source_surl) + self.assertEqual( + { + "root://source.es/file", + "root://source.es/file2", + "root://source.es/file3", + }, + registered, + ) + + def test_delete_file(self): + """ + Submit a deletion job with a file:/// + Must be denied + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "delete": [ + "root://source.es/file", + {"surl": "root://source.es/file2", "metadata": {"a": "b"}}, + "root://source.es/file", + "root://source.es/file2", + "file:///etc/passwd", + ] + } + + self.app.put(url="/jobs", params=json.dumps(job), status=400) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_invalid_submit.py b/src/fts3rest/fts3rest/tests/functional/test_job_invalid_submit.py new file mode 100644 index 0000000000000000000000000000000000000000..cc7930fea3c7ea23892d0f379f5680bdfe438e9e --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_invalid_submit.py @@ -0,0 +1,464 @@ +from datetime import timedelta +import json + +from fts3rest.tests import TestController + + +class TestJobInvalidSubmits(TestController): + """ + Collection of invalid submissions. Intended to check if the + job controller filters properly malformed and/or invalid requests. + """ + + def test_submit_malformed(self): + """ + Submit a piece of data that is not well-formed json + """ + self.setup_gridsite_environment() + + error = self.app.put(url="/jobs", params="thisXisXnotXjson", status=400).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertTrue(error["message"].startswith("Badly formatted JSON request")) + + def test_submit_no_transfers(self): + """ + Submit valid json data, but without actual transfers + """ + self.setup_gridsite_environment() + self.push_delegation() + job = {"parameters": {}} + + error = self.app.put(url="/jobs", params=json.dumps(job), status=400).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], "No transfers or namespace operations specified" + ) + + def test_no_protocol(self): + """ + Submit a valid transfer, but with urls with no protocol + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = {"files": [{"sources": ["/etc/passwd"], "destinations": ["/srv/pub"],}]} + + error = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Missing scheme (/etc/passwd)", + ) + + def test_no_file(self): + """ + Submit a valid transfer, but using file:// + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["file:///etc/passwd"], + "destinations": ["file:///srv/pub"], + } + ] + } + + error = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Can not transfer local files (file:///etc/passwd)", + ) + + def test_one_single_slash(self): + """ + Well-formed json, but source url is malformed + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["gsiftp:/source.es:8446/file"], + "destinations": ["gsiftp://dest.ch:8446/file"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + error = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Missing host (gsiftp:/source.es:8446/file)", + ) + + def test_empty_path(self): + """ + Well-formed json, but source path is missing + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["gsiftp://source.es:8446/"], + "destinations": ["gsiftp://dest.ch:8446/file"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + error = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Missing path (gsiftp://source.es:8446/)", + ) + + def test_submit_missing_surl(self): + """ + Well-formed json, but files is missing + """ + self.setup_gridsite_environment() + self.push_delegation() + job = {"transfers": [{"destinations": ["root://dest.ch/file"]}]} + + self.app.put(url="/jobs", params=json.dumps(job), status=400) + + job = {"transfers": [{"source": "root://source.es/file"}]} + + error = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], "No transfers or namespace operations specified" + ) + + def test_invalid_surl(self): + """ + Well-formed json, but the urls are malformed + """ + self.setup_gridsite_environment() + self.push_delegation() + job = { + "files": [ + { + "sources": ["http: //source.es/file"], # Note the space! + "destinations": ["http: //dest.ch/file"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ] + } + + error = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Missing host (http:/// //source.es/file)", + ) + + def test_submit_no_creds(self): + """ + Submission without valid credentials is forbidden + """ + self.assertFalse("GRST_CRED_AURI_0" in self.app.environ_base) + error = self.app.put(url="/jobs", params="thisXisXnotXjson", status=403).json + + self.assertEqual(error["status"], "403 Forbidden") + + def test_submit_no_delegation(self): + """ + Submission with valid credentials, but without a delegated proxy, + must request a delegation + """ + self.setup_gridsite_environment() + + job = { + "files": [ + { + "sources": ["root://source/file"], + "destinations": ["root://dest/file"], + } + ] + } + + error = self.app.put(url="/jobs", params=json.dumps(job), status=419).json + + self.assertEqual(error["status"], "419 Authentication Timeout") + self.assertEqual( + error["message"], + 'No delegation found for "%s"' % TestController.TEST_USER_DN, + ) + + def test_submit_expired_credentials(self): + """ + Submission with an expired proxy must request a delegation + """ + self.setup_gridsite_environment() + self.push_delegation(lifetime=timedelta(hours=-1)) + + job = { + "files": [ + { + "sources": ["root://source/file"], + "destinations": ["root://dest/file"], + } + ] + } + + error = self.app.put(url="/jobs", params=json.dumps(job), status=419).json + + self.assertEqual(error["status"], "419 Authentication Timeout") + self.assertEqual(error["message"][0:33], "The delegated credentials expired") + + def test_submit_almost_expired_credentials(self): + """ + Submission with an proxy that expires in minutes + """ + self.setup_gridsite_environment() + self.push_delegation(lifetime=timedelta(minutes=30)) + + job = { + "files": [ + { + "sources": ["root://source/file"], + "destinations": ["root://dest/file"], + } + ] + } + + error = self.app.put(url="/jobs", params=json.dumps(job), status=419).json + + self.assertEqual(error["status"], "419 Authentication Timeout") + self.assertTrue( + error["message"].startswith( + "The delegated credentials has less than one hour left" + ) + ) + + def test_submit_missing_path(self): + """ + Submit with a url that has no path + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://google.com"], + "destinations": ["root://dest/file"], + } + ] + } + + error = self.app.put(url="/jobs", params=json.dumps(job), status=400).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual( + error["message"], + "Invalid value within the request: Missing path (http://google.com)", + ) + + def test_submit_no_files(self): + """ + Submit with empty files + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = {"files": []} + + error = self.app.put(url="/jobs", params=json.dumps(job), status=400).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual(error["message"], "No valid pairs available") + + def test_invalid_files(self): + """ + Set something completely wrong in files + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": 48, + } + + error = self.app.put(url="/jobs", params=json.dumps(job), status=400).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual(error["message"][0:17], "Malformed request") + + def test_invalid_protocol_params(self): + """ + Submit a transfer specifying some invalid protocol parameters + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://dest.ch:8447/file"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "verify_checksum": True, + "timeout": "this-is-a-string", + "nostreams": 42, + "buffer_size": 1025, + "strict_copy": True, + }, + } + error = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ).json + + self.assertEqual(error["status"], "400 Bad Request") + self.assertEqual(error["message"][0:32], "Invalid value within the request") + + job["params"]["timeout"] = 0 + job["params"]["nostreams"] = "another-string" + self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ) + + job["params"]["nostreams"] = 4 + job["params"]["buffer_size"] = "and-yet-another-string" + + self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ) + + def test_transfer_and_deletion(self): + """ + Jobs must be either deletion or transfer, not both + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": ["root://dest.ch/file"], + "filesize": 1024, + } + ], + "delete": [ + "root://source.es/file", + {"surl": "root://source.es/file2", "metadata": {"a": "b"}}, + ], + } + + self.app.put(url="/jobs", params=json.dumps(job), status=400) + + def test_deletion_bad_surl(self): + """ + Submit a deletion job with an invalid surl + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = {"delete": ["xx"]} + + self.app.put(url="/jobs", params=json.dumps(job), status=400) + + def test_submit_reuse_different_hosts(self): + """ + Submit a reuse job with different hosts in the surls + Regression for FTS-291 + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": ["root://dest.ch/file"], + }, + { + "sources": ["root://somewhere.else.fr/file"], + "destinations": ["root://dest.ch/file"], + }, + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": True}, + } + + self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=400, + ) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_listing.py b/src/fts3rest/fts3rest/tests/functional/test_job_listing.py new file mode 100644 index 0000000000000000000000000000000000000000..ff3824f8bd352e1c19d174b4e8e7e9efce9142fc --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_listing.py @@ -0,0 +1,605 @@ +import json +from datetime import datetime, timedelta + +from fts3.model import FileRetryLog, Job, File +from fts3rest.model.meta import Session +from fts3rest.lib.middleware.fts3auth.credentials import UserCredentials +from fts3rest.lib.middleware.fts3auth import constants +from fts3rest.tests import TestController +import random + + +class TestJobListing(TestController): + """ + Tests for the job controller, listing, stating, etc. + """ + + def _submit( + self, file_metadata=None, dest_surl="root://dest.ch/file", random_url=True + ): + if random_url: + dest_surl = dest_surl + str(random.randint(200, 1000)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + if file_metadata: + job["files"][0]["metadata"] = file_metadata + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + return job_id + + def _terminal(self, state, window): + job_id = self._submit() + job = Session.query(Job).get(job_id) + files = Session.query(File).filter(File.job_id == job_id) + finish_time = datetime.utcnow() - window + job.finish_time = finish_time + job.job_finished = finish_time + job.job_state = state + Session.merge(job) + for f in files: + f.finish_time = finish_time + f.job_finished = finish_time + f.file_state = state + Session.merge(f) + Session.commit() + return job_id + + def test_show_job(self): + """ + Get information about a job + """ + self.setup_gridsite_environment() + self.push_delegation() + + job_id = self._submit() + job = self.app.get(url="/jobs/%s" % job_id, status=200).json + + self.assertEqual(job["job_id"], job_id) + self.assertEqual(job["job_state"], "SUBMITTED") + + def test_missing_job(self): + """ + Request an invalid job + """ + self.setup_gridsite_environment() + error = self.app.get(url="/jobs/1234x", status=404).json + + self.assertEqual(error["status"], "404 Not Found") + self.assertEqual(error["message"], 'No job with the id "1234x" has been found') + + def test_list_job_default(self): + """ + List active jobs + """ + self.setup_gridsite_environment() + self.push_delegation() + + job_id = self._submit() + + job_list = self.app.get(url="/jobs", status=200).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_with_dlg_id(self): + """ + List active jobs with the right delegation id + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", params={"dlg_id": creds.delegation_id}, status=200 + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_with_dn(self): + """ + List active jobs with the right user DN + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", params={"user_dn": creds.user_dn}, status=200 + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_with_vo(self): + """ + List active jobs with the right VO + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", params={"vo_name": creds.vos[0]}, status=200 + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_bad_dlg_id(self): + """ + Trying to list jobs belonging to a different delegation id + must be forbidden + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + self.app.get( + url="/jobs", params={"dlg_id": creds.delegation_id + "1234"}, status=403 + ) + + def test_list_bad_dn(self): + """ + Trying to list with the right delegation id mismatched bad DN is a bad + request + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + self.app.get( + url="/jobs", + params={"dlg_id": creds.delegation_id, "user_dn": "/CN=1234"}, + status=400, + ) + + def test_list_with_state_empty(self): + """ + Filter by state (no match) + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", + params={ + "dlg_id": creds.delegation_id, + "state_in": "FINISHED,FAILED,CANCELED", + }, + status=200, + ).json + self.assertFalse(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_with_state_match(self): + """ + Filter by state (match) + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", + params={"dlg_id": creds.delegation_id, "state_in": "ACTIVE,SUBMITTED"}, + status=200, + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_list_with_source_se(self): + """ + Filter by source storage + """ + self.setup_gridsite_environment() + self.push_delegation() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", params={"source_se": "root://source.es"}, status=200 + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + job_list = self.app.get( + url="/jobs", params={"source_se": "gsiftp://source.es"}, status=200 + ).json + self.assertTrue(job_id not in map(lambda j: j["job_id"], job_list)) + + def test_list_with_dest_se(self): + """ + Filter by destination storage + """ + self.setup_gridsite_environment() + self.push_delegation() + + job_id = self._submit() + + job_list = self.app.get( + url="/jobs", params={"dest_se": "root://dest.ch"}, status=200 + ).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + job_list = self.app.get( + url="/jobs", params={"dest_se": "gsiftp://dest.ch"}, status=200 + ).json + self.assertTrue(job_id not in map(lambda j: j["job_id"], job_list)) + + def test_list_no_vo(self): + """ + Submit a valid job with no VO data in the credentials. Listing it should be possible + afterwards (regression test for FTS-18) + """ + self.setup_gridsite_environment(no_vo=True) + self.push_delegation() + + job_id = self._submit() + + # Must be in the listings! + job_list = self.app.get(url="/jobs", status=200).json + self.assertTrue(job_id in map(lambda j: j["job_id"], job_list)) + + def test_get_no_vo(self): + """ + Submit a valid job with no VO data in the credentials. Stating it should be possible + afterwards (regression test for FTS-18) + """ + self.setup_gridsite_environment(no_vo=True) + self.push_delegation() + + job_id = self._submit() + + # Must be in the listings! + job_info = self.app.get(url="/jobs/%s" % job_id, status=200).json + self.assertEqual(job_id, job_info["job_id"]) + self.assertEqual(self.get_user_credentials().vos[0], job_info["vo_name"]) + + def test_get_field(self): + """ + Request a field from a job + """ + self.setup_gridsite_environment(no_vo=True) + self.push_delegation() + + job_id = self._submit() + + state = self.app.get(url="/jobs/%s/job_state" % job_id, status=200).json + self.assertEqual(state, "SUBMITTED") + + def test_get_job_forbidden(self): + """ + Ask for a job we don't have permission to get + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + old_granted = UserCredentials.get_granted_level_for + UserCredentials.get_granted_level_for = lambda self_, op: None + + error = self.app.get(url="/jobs/%s" % job_id, status=403).json + + UserCredentials.get_granted_level_for = old_granted + + self.assertEqual(error["status"], "403 Forbidden") + + def test_get_missing_field(self): + """ + Ask for a field that doesn't exist + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + error = self.app.get( + url="/jobs/%s/not_really_a_field" % job_id, status=404 + ).json + + self.assertEqual(error["status"], "404 Not Found") + self.assertEqual(error["message"], "No such field") + + def test_get_files(self): + """ + Get the files within a job + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + + self.assertEqual(1, len(files)) + self.assertEqual("root://source.es/file", files[0]["source_surl"]) + self.assertEqual(1024, files[0]["user_filesize"]) + + def test_no_retries(self): + """ + Get the retries for a file, without retries + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + file_id = files[0]["file_id"] + + retries = self.app.get( + url="/jobs/%s/files/%d/retries" % (job_id, file_id), status=200 + ).json + self.assertEqual(0, len(retries)) + + def test_get_retries(self): + """ + Get the retries for a file, forcing one + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + files = self.app.get(url="/jobs/%s/files" % job_id, status=200).json + file_id = files[0]["file_id"] + + retry = FileRetryLog() + retry.file_id = file_id + retry.attempt = 1 + retry.datetime = datetime.utcnow() + retry.reason = "Blahblahblah" + + Session.merge(retry) + Session.commit() + + retries = self.app.get( + url="/jobs/%s/files/%d/retries" % (job_id, file_id), status=200 + ).json + + self.assertEqual(1, len(retries)) + self.assertEqual(1, retries[0]["attempt"]) + self.assertEqual("Blahblahblah", retries[0]["reason"]) + + def test_get_files_in_job(self): + """ + Users may want to get information about nested files in one go, so let them ask for + some of their fields. + See FTS-137 + """ + self.setup_gridsite_environment() + self.push_delegation() + job_id = self._submit() + + job = self.app.get( + url="/jobs/%s?files=start_time,source_surl" % job_id, status=200 + ).json + + self.assertIn("files", job) + self.assertEqual(1, len(job["files"])) + f = job["files"][0] + self.assertIn("start_time", f) + self.assertIn("source_surl", f) + self.assertNotIn("finish_time", f) + + self.assertEqual("root://source.es/file", f["source_surl"]) + + def test_get_multiple_jobs(self): + """ + Query multiple jobs at once + See FTS-147 + """ + self.setup_gridsite_environment() + self.push_delegation() + + N_JOBS = 5 + job_ids = list() + for i in range(N_JOBS): + job_ids.append(self._submit()) + + job_list = self.app.get( + url="/jobs/%s?files=start_time,source_surl,file_state" % ",".join(job_ids), + status=200, + ).json + + self.assertEqual(list, type(job_list)) + self.assertEqual(N_JOBS, len(job_list)) + + matches = 0 + for job in job_list: + if job["job_id"] in job_ids: + matches += 1 + for f in job["files"]: + self.assertIn("start_time", f) + self.assertIn("source_surl", f) + self.assertNotIn("finish_time", f) + self.assertIn("file_state", f) + self.assertEqual("SUBMITTED", f["file_state"]) + + self.assertEqual(N_JOBS, matches) + + def test_get_multiple_jobs_one_missing(self): + """ + Same as test_get_multiple_jobs, but push one missing job + """ + self.setup_gridsite_environment() + self.push_delegation() + + N_JOBS = 5 + job_ids = list() + for i in range(N_JOBS): + job_ids.append(self._submit()) + job_ids.append("12345-BADBAD-09876") + + job_list = self.app.get( + url="/jobs/%s?files=start_time,source_surl,file_state" % ",".join(job_ids), + status=207, + ).json + + self.assertEqual(list, type(job_list)) + self.assertEqual(N_JOBS + 1, len(job_list)) + + for job in job_list: + if job["job_id"] == "12345-BADBAD-09876": + self.assertEqual("404 Not Found", job["http_status"]) + else: + self.assertEqual("200 Ok", job["http_status"]) + + def test_filter_by_time(self): + """ + Filter by time_window + """ + self.setup_gridsite_environment() + self.push_delegation() + creds = self.get_user_credentials() + + job_id = self._terminal("FINISHED", timedelta(minutes=30)) + + # Try one hour + job_list = self.app.get( + url="/jobs", + params={ + "dlg_id": creds.delegation_id, + "state_in": "FINISHED", + "time_window": "1", + }, + status=200, + ).json + + self.assertIn(job_id, [j["job_id"] for j in job_list]) + + # Try 15 minutes + job_list = self.app.get( + url="/jobs", + params={ + "dlg_id": creds.delegation_id, + "state_in": "FINISHED", + "time_window": "0:15", + }, + status=200, + ).json + + self.assertNotIn(job_id, [j["job_id"] for j in job_list]) + + def test_get_multiple_with_files(self): + """ + Query multiple jobs at once, asking for a subset of the fields of their files + Regression for FTS-265 + """ + self.setup_gridsite_environment() + self.push_delegation() + + job1 = self._submit(file_metadata="a") + job2 = self._submit(file_metadata="5") + job3 = self._submit(file_metadata="?") + + jobs = self.app.get( + url="/jobs/%s?files=job_id,file_metadata" % ",".join([job1, job2, job3]), + status=200, + ).json + + self.assertEqual(3, len(jobs)) + self.assertEqual(jobs[0]["job_id"], jobs[0]["files"][0]["job_id"]) + self.assertEqual("a", jobs[0]["files"][0]["file_metadata"]) + self.assertEqual(jobs[1]["job_id"], jobs[1]["files"][0]["job_id"]) + self.assertEqual("5", jobs[1]["files"][0]["file_metadata"]) + self.assertEqual(jobs[2]["job_id"], jobs[2]["files"][0]["job_id"]) + self.assertEqual("?", jobs[2]["files"][0]["file_metadata"]) + + def test_query_something_running(self): + """ + Query if there are any active or submitted files for a given destination surl + Requested by ATLAS to query if a given destination file landed on the DB + """ + self.setup_gridsite_environment() + self.push_delegation() + + job1 = self._submit(dest_surl="gsiftp://test-query1/path", random_url=False) + job2 = self._submit(dest_surl="gsiftp://test-query2/path", random_url=False) + + files = self.app.get( + url="/files?dest_surl=gsiftp://test-query2/path", status=200 + ).json + + self.assertNotIn(job1, list(map(lambda f: f["job_id"], files))) + self.assertIn(job2, list(map(lambda f: f["job_id"], files))) + + files = self.app.get( + url="/files?dest_surl=gsiftp://test-query1/path", status=200 + ).json + + self.assertIn(job1, list(map(lambda f: f["job_id"], files))) + self.assertNotIn(job2, list(map(lambda f: f["job_id"], files))) + + def test_list_granted_private(self): + """ + Configure access level to PRIVATE, so the user can only see their own transfers + """ + self.setup_gridsite_environment(dn="/CN=fakeson") + self.push_delegation() + + self._submit() + self._submit() + + self.setup_gridsite_environment() + self.push_delegation() + + job3 = self._submit() + + old_granted = UserCredentials.get_granted_level_for + UserCredentials.get_granted_level_for = lambda self_, op: constants.PRIVATE + + jobs = self.app.get(url="/jobs", status=200).json + + UserCredentials.get_granted_level_for = old_granted + + self.assertEqual(1, len(jobs)) + self.assertEqual(job3, jobs[0]["job_id"]) + + def test_list_granted_vo(self): + """ + Configure access level to VO, so the user can see their own transfers and others, if they + belong to the same vo + """ + self.setup_gridsite_environment(dn="/CN=fakeson", no_vo=True) + self.push_delegation() + + self._submit() + self._submit() + + self.setup_gridsite_environment(dn="/CN=jameson") + self.push_delegation() + + job1 = self._submit() + job2 = self._submit() + + self.setup_gridsite_environment() + self.push_delegation() + + job3 = self._submit() + + old_granted = UserCredentials.get_granted_level_for + UserCredentials.get_granted_level_for = lambda self_, op: constants.VO + + jobs = self.app.get(url="/jobs", status=200).json + + UserCredentials.get_granted_level_for = old_granted + + self.assertEqual(3, len(jobs)) + job_ids = list(map(lambda j: j["job_id"], jobs)) + self.assertIn(job1, job_ids) + self.assertIn(job2, job_ids) + self.assertIn(job3, job_ids) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_modify.py b/src/fts3rest/fts3rest/tests/functional/test_job_modify.py new file mode 100644 index 0000000000000000000000000000000000000000..449eebd0adac7a091859e1cf9fe0cc26028d8e99 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_modify.py @@ -0,0 +1,64 @@ +from fts3rest.tests import TestController +from fts3rest.model.meta import Session +from fts3.model import Job + + +class TestJobModify(TestController): + """ + Tests job modification + """ + + def test_job_priority(self): + """ + Submit a job, change priority later + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": ["root://dest.ch/file"], + } + ], + "params": {"priority": 2}, + } + + job_id = self.app.post_json(url="/jobs", params=job, status=200).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(2, job.priority) + + mod = {"params": {"priority": 4}} + + self.app.post_json(url="/jobs/%s" % str(job_id), params=mod, status=200) + + job = Session.query(Job).get(job_id) + self.assertEqual(4, job.priority) + + def test_job_priority_invalid(self): + """ + Submit a job, try to change priority to an invalid value later + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": ["root://dest.ch/file"], + } + ], + "params": {"priority": 2}, + } + + job_id = self.app.post_json(url="/jobs", params=job, status=200).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(2, job.priority) + + mod = {"params": {"priority": "axxx"}} + + self.app.post_json(url="/jobs/%s" % str(job_id), params=mod, status=400) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_submission.py b/src/fts3rest/fts3rest/tests/functional/test_job_submission.py new file mode 100644 index 0000000000000000000000000000000000000000..6dc3829c2a1b873e7c75e6858bf992aa866e6260 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_submission.py @@ -0,0 +1,1120 @@ +import json +import socket +import time + +from fts3rest.tests import TestController +from fts3rest.model.meta import Session +from fts3.model import File, Job +import random +import unittest +from math import ceil + + +def _ceil_time(): + return ceil(time.time()) + + +class TestJobSubmission(TestController): + """ + Tests job submission + """ + + def _validate_submitted(self, job, no_vo=False, dn=TestController.TEST_USER_DN): + self.assertNotEqual(job, None) + files = job.files + self.assertNotEqual(files, None) + + self.assertEqual(job.user_dn, dn) + if no_vo: + self.assertEqual(job.vo_name, "TestUser@cern.ch") + else: + self.assertEqual(job.vo_name, "testvo") + self.assertEqual(job.job_state, "SUBMITTED") + + self.assertEqual(job.source_se, "root://source.es") + self.assertEqual(job.dest_se, "root://dest.ch") + self.assertEqual(job.overwrite_flag, True) + self.assertEqual(job.verify_checksum, "b") + self.assertEqual(job.job_type, "N") + self.assertEqual(job.priority, 3) + self.assertIsNone(job.max_time_in_queue) + + self.assertEqual(len(files), 1) + self.assertEqual(files[0].file_state, "SUBMITTED") + self.assertEqual(files[0].source_surl, "root://source.es/file") + self.assertEqual(files[0].source_se, "root://source.es") + self.assertEqual(files[0].dest_se, "root://dest.ch") + self.assertEqual(files[0].file_index, 0) + self.assertEqual(files[0].selection_strategy, "orderly") + self.assertEqual(files[0].user_filesize, 1024) + self.assertEqual(files[0].checksum, "adler32:1234") + self.assertEqual(files[0].file_metadata["mykey"], "myvalue") + if no_vo: + self.assertEqual(files[0].vo_name, "TestUser@cern.ch") + else: + self.assertEqual(files[0].vo_name, "testvo") + + self.assertEqual(files[0].activity, "default") + + # Validate submitter + self.assertEqual(socket.getfqdn(), job.submit_host) + + def test_submit(self): + """ + Submit a valid job + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertTrue(job_id) # not empty + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_no_reuse(self): + """ + Submit a valid job no reuse + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": False}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertTrue(job_id) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_no_reuse_N(self): + """ + Submit a valid job, using 'N' instead of False + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": "N"}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertTrue(job_id) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_reuse(self): + """ + Submit a valid reuse job + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertTrue(job_id) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_type, "Y") + + return job_id + + def test_submit_Y(self): + """ + Submit a valid reuse job, using 'Y' instead of True + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": "Y", "verify_checksum": "Y", "reuse": "Y"}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertTrue(job_id) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_type, "Y") + + def test_submit_post(self): + """ + Submit a valid job using POST instead of PUT + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return job_id + + def test_submit_with_port(self): + """ + Submit a valid job where the port is explicit in the url. + source_se and dest_se must exclude this port + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "srm://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["srm://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + db_job = Session.query(Job).get(job_id) + + self.assertEqual(db_job.source_se, "srm://source.es") + self.assertEqual(db_job.dest_se, "srm://dest.ch") + + self.assertEqual(db_job.files[0].source_se, "srm://source.es") + self.assertEqual(db_job.files[0].dest_se, "srm://dest.ch") + + return job_id + + def test_submit_only_query(self): + """ + Submit a valid job, without a path, but with a query in the url. + This is valid for some protocols (i.e. srm) + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["srm://source.es/?SFN=/path/"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "copy_pin_lifetime": 3600, + "bring_online": 60, + "verify_checksum": True, + }, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + db_job = Session.query(Job).get(job_id) + self.assertEqual(db_job.job_state, "STAGING") + self.assertEqual(db_job.files[0].file_state, "STAGING") + self.assertEqual(db_job.copy_pin_lifetime, 3600) + self.assertEqual(db_job.bring_online, 60) + + return job_id + + def test_null_checksum(self): + """ + Valid job, with checksum explicitly set to null + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": None, + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "ADLER32") + + return job_id + + def test_checksum_no_verify(self): + """ + Valid job, with checksum, but verify_checksum is not set + In the DB, it must end as 'r' (compatibility with FTS3 behaviour) + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "t") + + return job_id + + def test_verify_checksum_target(self): + """ + Valid job, verify checksum in destination. + In the DB, it must end as 'r' (compatibility with FTS3 behaviour) or destination + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "target"}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "t") + + return job_id + + def test_verify_checksum_source(self): + """ + Valid job, verify checksum in source. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "source"}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "s") + + return job_id + + def test_verify_checksum_both(self): + """ + Valid job, verify checksum in source. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "both"}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "b") + + return job_id + + def test_verify_checksum_none(self): + """ + Valid job, verify checksum none. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "none"}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.verify_checksum, "n") + + return job_id + + def test_null_user_filesize(self): + """ + Valid job, with filesize explicitly set to null + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "filesize": None, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].user_filesize, 0) + + return job_id + + def test_no_vo(self): + """ + Submit a valid job with no VO data in the credentials (could happen with plain SSL!) + The job must be accepted, but assigned to the user's 'virtual' vo. + """ + self.setup_gridsite_environment(no_vo=True) + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted(Session.query(Job).get(job_id), no_vo=True) + + def test_no_vo_proxy(self): + """ + Submit a valid job with no VO data in the credentials, but still being a proxy. + The job must be accepted, but assigned to the user's 'virtual' vo. + """ + proxy_dn = self.TEST_USER_DN + "/CN=proxy" + self.setup_gridsite_environment(no_vo=True, dn=proxy_dn) + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted( + Session.query(Job).get(job_id), no_vo=True, dn=proxy_dn + ) + + def test_retry(self): + """ + Submit with a specific retry value + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "verify_checksum": True, + "retry": 42, + "retry_delay": 123, + }, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self._validate_submitted(job) + self.assertEqual(job.retry, 42) + self.assertEqual(job.retry_delay, 123) + + def test_with_activity(self): + """ + Submit a job specifiying activities for the files + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "activity": "my-activity", + }, + { + "sources": ["https://source.es/file2"], + "destinations": ["https://dest.ch/file2"], + "activity": "my-second-activity", + }, + ] + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].activity, "my-activity") + self.assertEqual(job.files[1].activity, "my-second-activity") + + def test_surl_with_spaces(self): + """ + Submit a job where the surl has spaces at the beginning and at the end + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file\n \r "], + "destinations": ["\r\n" + dest_surl + "\n\n \n"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024.0, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self._validate_submitted(job) + + def test_submit_different_protocols(self): + """ + Source and destination protocol mismatch + For REST <= 3.2.3, this used to be forbidden, but it was decided to allow it + https://its.cern.ch/jira/browse/FTS-97 + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(1, len(job.files)) + self.assertEqual("http://source.es:8446/file", job.files[0].source_surl) + self.assertEqual(dest_surl, job.files[0].dest_surl) + + def test_submit_with_cloud_cred(self): + """ + Submit a job specifying cloud credentials + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["dropbox://dropbox.com/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(1, len(job.files)) + self.assertEqual("dropbox://dropbox.com/file", job.files[0].source_surl) + self.assertEqual(dest_surl, job.files[0].dest_surl) + + def test_submit_protocol_params(self): + """ + Submit a transfer specifying some protocol parameters + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "verify_checksum": True, + "timeout": 1234, + "nostreams": 42, + "buffer_size": 1025, + "strict_copy": True, + }, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertTrue(job.internal_job_params is not None) + params = job.internal_job_params.split(",") + self.assertIn("timeout:1234", params) + self.assertIn("nostreams:42", params) + self.assertIn("buffersize:1025", params) + self.assertIn("strict", params) + + def test_submit_with_priority(self): + """ + Submit a job specifying the priority + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"priority": 5,}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(job.priority, 5) + + def test_submit_max_time_in_queue(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": 8}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # See FTS-311 + # max_time_in_queue was effectively ignored by FTS3 + # Since FTS-311, this field stores the timestamp when the job expires + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, _ceil_time()) + self.assertLessEqual(job.max_time_in_queue, (8 * 60 * 60) + _ceil_time()) + + def test_submit_max_time_in_queue_suffix(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + Use a suffix. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": "4s"}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, _ceil_time()) + self.assertLessEqual(job.max_time_in_queue, 8 + _ceil_time()) + + def test_submit_max_time_in_queue_suffix2(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + Use a suffix. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest" + str(random.randint(0, 100)) + ".ch:8447/file" + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": "2m"}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, _ceil_time()) + self.assertLessEqual(job.max_time_in_queue, 120 + _ceil_time()) + + def test_submit_ipv4(self): + """ + Submit a job with IPv4 only + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv4.ch:8447/file"], + } + ], + "params": {"ipv4": True}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertIn("ipv4", jobdb.internal_job_params) + self.assertNotIn("ipv6", jobdb.internal_job_params) + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv4tofalse.ch:8447/file"], + } + ], + "params": {"ipv4": False}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertTrue( + jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params + ) + self.assertTrue( + jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params + ) + + def test_submit_ipv6(self): + """ + Submit a job with IPv6 only + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv6.ch:8447/file"], + } + ], + "params": {"ipv6": True}, + } + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertIn("ipv6", jobdb.internal_job_params) + self.assertNotIn("ipv4", jobdb.internal_job_params) + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv6tofalse.ch:8447/file"], + } + ], + "params": {"ipv6": False}, + } + + job_id = self.app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertTrue( + jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params + ) + self.assertTrue( + jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params + )