From 3d536b40b096ee9725187f702078bf0a9a1c902e Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 18 Mar 2020 15:15:35 +0100 Subject: [PATCH 1/5] Migrate auth --- requirements.in | 5 +- .../lib/middleware/fts3auth/credentials.py | 223 +++++++ .../middleware/fts3auth/methods/__init__.py | 48 ++ .../lib/middleware/fts3auth/methods/http.py | 4 + .../lib/middleware/fts3auth/methods/oauth2.py | 71 +++ .../lib/middleware/fts3auth/methods/ssl.py | 89 +++ src/fts3rest/fts3rest/lib/oauth2lib/LICENSE | 14 + src/fts3rest/fts3rest/lib/oauth2lib/client.py | 83 +++ .../fts3rest/lib/oauth2lib/provider.py | 596 ++++++++++++++++++ src/fts3rest/fts3rest/lib/oauth2lib/utils.py | 64 ++ src/fts3rest/fts3rest/lib/oauth2provider.py | 360 +++++++++++ src/fts3rest/fts3rest/lib/openidconnect.py | 220 +++++++ 12 files changed, 1776 insertions(+), 1 deletion(-) create mode 100644 src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py create mode 100644 src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/__init__.py create mode 100644 src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/http.py create mode 100644 src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/oauth2.py create mode 100644 src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/ssl.py create mode 100644 src/fts3rest/fts3rest/lib/oauth2lib/LICENSE create mode 100644 src/fts3rest/fts3rest/lib/oauth2lib/client.py create mode 100644 src/fts3rest/fts3rest/lib/oauth2lib/provider.py create mode 100644 src/fts3rest/fts3rest/lib/oauth2lib/utils.py create mode 100644 src/fts3rest/fts3rest/lib/oauth2provider.py create mode 100644 src/fts3rest/fts3rest/lib/openidconnect.py diff --git a/requirements.in b/requirements.in index cd72b233..3b4271d9 100644 --- a/requirements.in +++ b/requirements.in @@ -4,4 +4,7 @@ M2Crypto # requests mysqlclient # requires python3-devel mysql-devel dirq - +python-dateutil +PyJWT +jwcrypto +oic \ No newline at end of file diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py new file mode 100644 index 00000000..f45c410c --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py @@ -0,0 +1,223 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import hashlib +import logging +import re + +from fts3.model import AuthorizationByDn +from fts3rest.model.meta import Session +from .methods import Authenticator + +log = logging.getLogger(__name__) + + +def vo_from_fqan(fqan): + """ + Get the VO from a full FQAN + + Args: + fqan: A single fqans (i.e. /dteam/cern/Role=lcgadmin) + Returns: + The vo + group (i.e. dteam/cern) + """ + components = fqan.split("/")[1:] + groups = [] + for c in components: + if c.lower().startswith("role="): + break + groups.append(c) + return "/".join(groups) + + +def generate_delegation_id(dn, fqans): + """ + Generate a delegation ID from the user DN and FQANS + Adapted from FTS3 + See https://svnweb.cern.ch/trac/fts3/browser/trunk/src/server/ws/delegation/GSoapDelegationHandler.cpp + + Args: + dn: The user DN + fqans: A list of fqans + Returns: + The associated delegation id + """ + d = hashlib.sha1() + d.update(dn) + + for fqan in fqans: + d.update(fqan) + + # Original implementation only takes into account first 16 characters + return d.hexdigest()[:16] + + +def build_vo_from_dn(user_dn): + """ + Generate an 'anonymous' VO from the user_dn + """ + components = filter( + lambda c: len(c) == 2, map(lambda c: tuple(c.split("=")), user_dn.split("/")) + ) + domain = [] + uname = "" + for key, value in components: + if key.upper() == "CN" and not uname: + uname = value + elif key.upper() == "DC": + domain.append(value) + # Normalize name + uname = "".join(uname.split()) + return uname + "@" + ".".join(reversed(domain)) + + +class InvalidCredentials(Exception): + """ + Credentials have been provided, but they are invalid + """ + + pass + + +class UserCredentials(object): + """ + Handles the user credentials and privileges + """ + + authenticator = Authenticator() + role_regex = re.compile("(/.+)*/Role=(\\w+)(/.*)?", re.IGNORECASE) + + def _anonymous(self): + """ + Not authenticated access + """ + self.user_dn = "anon" + self.method = "unauthenticated" + self.dn.append(self.user_dn) + + def __init__(self, env, role_permissions=None, config=None): + """ + Constructor + + Args: + env: Environment (i.e. os.environ) + role_permissions: The role permissions as configured in the FTS3 config file + """ + # Default + self.user_dn = None + self.dn = [] + self.base_id = [] + self.voms_cred = [] + self.vos = [] + self.vos_id = [] + self.roles = [] + self.level = [] + self.delegation_id = None + self.method = None + self.is_root = False + + got_creds = self.authenticator(self, env, config) + + # Last resort: anonymous access + if not got_creds: + self._anonymous() + else: + # Populate roles + self.roles = self._populate_roles() + # And granted level + self.level = self._granted_level(role_permissions) + + def _populate_roles(self): + """ + Get roles out of the FQANS + """ + roles = [] + for fqan in self.voms_cred: + match = UserCredentials.role_regex.match(fqan) + if match and match.group(2).upper() != "NULL": + roles.append(match.group(2)) + return roles + + def _granted_level(self, role_permissions): + """ + Get all granted levels for this user out of the configuration + (all levels authorized for public, plus those for the given Roles) + """ + if self.is_root: + return { + "transfer": "all", + "deleg": "all", + "config": "all", + "datamanagement": "all", + } + + granted_level = dict() + + # Public apply to anyone + if role_permissions is not None: + if "public" in role_permissions: + granted_level = copy.deepcopy(role_permissions["public"]) + + # Roles from the proxy + for grant in self.roles: + if grant in role_permissions: + granted_level.update(copy.deepcopy(role_permissions[grant])) + + # DB Configuration + for grant in ( + Session.query(AuthorizationByDn) + .filter(AuthorizationByDn.dn == self.user_dn) + .all() + ): + log.info( + '%s granted to "%s" because it is configured in the database' + % (grant.operation, self.user_dn) + ) + granted_level[grant.operation] = "all" + + return granted_level + + def get_granted_level_for(self, operation): + """ + Check if the user can perform the operation 'operation' + + Args: + operation: The operation to check (see constants.py) + + Returns: + None if the user can not perform the operation + constants.VO if only can perform it on same VO resources + constants.ALL if can perform on any resource + constants.PRIVATE if can perform only on his/her own resources + """ + if operation in self.level: + return self.level[operation] + elif "*" in self.level: + return self.level["*"] + else: + return None + + def has_vo(self, vo): + """ + Check if the user belongs to the given VO + + Args: + vo: The VO name (i.e. dteam) + + Returns: + True if the user credentials include the given VO + """ + return vo in self.vos diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/__init__.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/__init__.py new file mode 100644 index 00000000..b2d43f14 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/__init__.py @@ -0,0 +1,48 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pkgutil +import sys + + +class Authenticator: + """ + Wraps different authentication methods installed in this directory. + The 'do_authentication' call must: + * Return True and set the credentials parameter if + the recognised method is used and valid. + * Return False if the recognised method is _NOT_ in use + * Raise InvalidCredentials is the recognised method _IS_ in use, but invalid + """ + + def __init__(self): + _current_module = sys.modules[__name__] + _prefix = _current_module.__name__ + "." + self._auth_modules = list() + for importer, modname, ispkg in pkgutil.iter_modules( + _current_module.__path__, _prefix + ): + _authnmod = importer.find_module(modname).load_module(modname) + if hasattr(_authnmod, "do_authentication"): + self._auth_modules.append(_authnmod) + + def __call__(self, credentials, env, config): + """ + Iterate through pluggable authentication modules + """ + for authnmod in self._auth_modules: + if authnmod.do_authentication(credentials, env, config): + return True + return False diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/http.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/http.py new file mode 100644 index 00000000..ea3028ae --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/http.py @@ -0,0 +1,4 @@ +""" +This cannot be migrated becaue m2ext is a 9 year old obsolete package +Apparently it's used by WebFTS +""" diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/oauth2.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/oauth2.py new file mode 100644 index 00000000..19fbfef8 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/oauth2.py @@ -0,0 +1,71 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import types +from fts3rest.lib.oauth2provider import FTS3OAuth2ResourceProvider +from fts3rest.lib.middleware.fts3auth.credentials import ( + vo_from_fqan, + build_vo_from_dn, + InvalidCredentials, + generate_delegation_id, +) +import logging + +log = logging.getLogger(__name__) + + +def _oauth2_get_granted_level_for(self, operation): + # All users authenticated through IAM will be considered root users + return "all" + + +def do_authentication(credentials, env, config): + """ + The user will be the one who gave the bearer token + """ + res_provider = FTS3OAuth2ResourceProvider(env, config) + authn = res_provider.get_authorization() + if authn is None: + return False + if not authn.is_valid: + if authn.error == "access_denied": + log.info("about to raise invalid credentials") + raise InvalidCredentials("Invalid OAuth2 credentials") + return False + + credentials.dn.append(authn.credentials.dn) + credentials.user_dn = authn.credentials.dn + credentials.delegation_id = authn.credentials.dlg_id + if authn.credentials.voms_attrs: + for fqan in authn.credentials.voms_attrs.split(): + credentials.voms_cred.append(fqan) + credentials.vos.append(fqan) + credentials.method = "oauth2" + + # Override get_granted_level_for so we can filter by the scope + setattr(credentials, "oauth2_scope", authn.scope) + setattr( + credentials, + "get_granted_level_for_overriden", + credentials.get_granted_level_for, + ) + setattr( + credentials, + "get_granted_level_for", + types.MethodType(_oauth2_get_granted_level_for, credentials), + ) + + return True diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/ssl.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/ssl.py new file mode 100644 index 00000000..cf7f9253 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/methods/ssl.py @@ -0,0 +1,89 @@ +# Copyright Members of the EMI Collaboration, 2013. +# Copyright 2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from urllib.parse import unquote_plus + +from fts3rest.lib.middleware.fts3auth.credentials import ( + vo_from_fqan, + build_vo_from_dn, + generate_delegation_id, +) + + +def _mod_gridsite_authn(credentials, env): + """ + Retrieve credentials from GRST_ variables set by mod_gridsite + """ + grst_index = 0 + grst_env = "GRST_CRED_AURI_%d" % grst_index + while grst_env in env: + cred = env[grst_env] + + if cred.startswith("dn:"): + credentials.dn.append(unquote_plus(cred[3:])) + elif cred.startswith("fqan:"): + fqan = unquote_plus(cred[5:]) + vo = vo_from_fqan(fqan) + credentials.voms_cred.append(fqan) + if vo not in credentials.vos and vo: + credentials.vos.append(vo) + + grst_index += 1 + grst_env = "GRST_CRED_AURI_%d" % grst_index + return len(credentials.dn) > 0 + + +def _mod_ssl_authn(credentials, env): + """ + Retrieve credentials from SSL_ variables set by mod_ssl + """ + if "SSL_CLIENT_S_DN" in env: + credentials.dn.append(unquote_plus(env["SSL_CLIENT_S_DN"])) + return True + return False + + +def do_authentication(credentials, env, config=None): + """ + Try with a proxy or certificate, via mod_gridsite or mod_ssl + """ + got_creds = _mod_gridsite_authn(credentials, env) or _mod_ssl_authn( + credentials, env + ) + if not got_creds: + return False + # If more than one dn, pick first one + if len(credentials.dn) > 0: + credentials.user_dn = credentials.dn[0] + # Generate the delegation ID + if credentials.user_dn is not None: + credentials.delegation_id = generate_delegation_id( + credentials.user_dn, credentials.voms_cred + ) + # If no vo information is available, build a 'virtual vo' for this user + if not credentials.vos and credentials.user_dn: + credentials.vos.append(build_vo_from_dn(credentials.user_dn)) + credentials.method = "certificate" + # If the user's DN matches the host DN, then grant all + host_dn = env.get("SSL_SERVER_S_DN", None) + if host_dn and host_dn == credentials.user_dn: + credentials.is_root = True + else: + if "/" not in str(host_dn): + host_dn = str(host_dn).replace(",", "/") + host_dn = "/" + "/".join(reversed(str(host_dn).split("/"))) + if host_dn == credentials.user_dn: + credentials.is_root = True + return True diff --git a/src/fts3rest/fts3rest/lib/oauth2lib/LICENSE b/src/fts3rest/fts3rest/lib/oauth2lib/LICENSE new file mode 100644 index 00000000..dc572bf1 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/oauth2lib/LICENSE @@ -0,0 +1,14 @@ +Copyright (c) 2012-2013 SHIFT, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/src/fts3rest/fts3rest/lib/oauth2lib/client.py b/src/fts3rest/fts3rest/lib/oauth2lib/client.py new file mode 100644 index 00000000..6a5521e4 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/oauth2lib/client.py @@ -0,0 +1,83 @@ +import requests +from . import utils + + +class Client(object): + def __init__( + self, client_id, client_secret, redirect_uri, authorization_uri, token_uri + ): + """Constructor for OAuth 2.0 Client. + + :param client_id: Client ID. + :type client_id: str + :param client_secret: Client secret. + :type client_secret: str + :param redirect_uri: Client redirect URI: handle provider response. + :type redirect_uri: str + :param authorization_uri: Provider authorization URI. + :type authorization_uri: str + :param token_uri: Provider token URI. + :type token_uri: str + """ + self.client_id = client_id + self.client_secret = client_secret + self.redirect_uri = redirect_uri + self.authorization_uri = authorization_uri + self.token_uri = token_uri + + @property + def default_response_type(self): + return "code" + + @property + def default_grant_type(self): + return "authorization_code" + + def http_post(self, url, data=None): + """POST to URL and get result as a response object. + + :param url: URL to POST. + :type url: str + :param data: Data to send in the form body. + :type data: str + :rtype: requests.Response + """ + if not url.startswith("https://"): + raise ValueError("Protocol must be HTTPS, invalid URL: %s" % url) + return requests.post(url, data, verify=True) + + def get_authorization_code_uri(self, **params): + """Construct a full URL that can be used to obtain an authorization + code from the provider authorization_uri. Use this URI in a client + frame to cause the provider to generate an authorization code. + + :rtype: str + """ + if "response_type" not in params: + params["response_type"] = self.default_response_type + params.update({"client_id": self.client_id, "redirect_uri": self.redirect_uri}) + return utils.build_url(self.authorization_uri, params) + + def get_token(self, code, **params): + """Get an access token from the provider token URI. + + :param code: Authorization code. + :type code: str + :return: Dict containing access token, refresh token, etc. + :rtype: dict + """ + params["code"] = code + if "grant_type" not in params: + params["grant_type"] = self.default_grant_type + params.update( + { + "client_id": self.client_id, + "client_secret": self.client_secret, + "redirect_uri": self.redirect_uri, + } + ) + response = self.http_post(self.token_uri, params) + try: + return response.json() + except TypeError: + return response.json diff --git a/src/fts3rest/fts3rest/lib/oauth2lib/provider.py b/src/fts3rest/fts3rest/lib/oauth2lib/provider.py new file mode 100644 index 00000000..8deca593 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/oauth2lib/provider.py @@ -0,0 +1,596 @@ +import json +import logging +from requests import Response +from io import StringIO +from werkzeug.exceptions import Unauthorized +from . import utils + +log = logging.getLogger(__name__) + + +class Provider(object): + """Base provider class for different types of OAuth 2.0 providers.""" + + def _handle_exception(self, exc): + """Handle an internal exception that was caught and suppressed. + + :param exc: Exception to process. + :type exc: Exception + """ + logger = logging.getLogger(__name__) + logger.exception(exc) + + def _make_response(self, body="", headers=None, status_code=200): + """Return a response object from the given parameters. + + :param body: Buffer/string containing the response body. + :type body: str + :param headers: Dict of headers to include in the requests. + :type headers: dict + :param status_code: HTTP status code. + :type status_code: int + :rtype: requests.Response + """ + res = Response() + res.status_code = status_code + if headers is not None: + res.headers.update(headers) + res.raw = StringIO(body) + return res + + def _make_redirect_error_response(self, redirect_uri, err): + """Return a HTTP 302 redirect response object containing the error. + + :param redirect_uri: Client redirect URI. + :type redirect_uri: str + :param err: OAuth error message. + :type err: str + :rtype: requests.Response + """ + params = { + "error": err, + "response_type": None, + "client_id": None, + "redirect_uri": None, + } + redirect = utils.build_url(redirect_uri, params) + return self._make_response(headers={"Location": redirect}, status_code=302) + + def _make_json_response(self, data, headers=None, status_code=200): + """Return a response object from the given JSON data. + + :param data: Data to JSON-encode. + :type data: mixed + :param headers: Dict of headers to include in the requests. + :type headers: dict + :param status_code: HTTP status code. + :type status_code: int + :rtype: requests.Response + """ + response_headers = {} + if headers is not None: + response_headers.update(headers) + response_headers["Content-Type"] = "application/json;charset=UTF-8" + response_headers["Cache-Control"] = "no-store" + response_headers["Pragma"] = "no-cache" + return self._make_response(json.dumps(data), response_headers, status_code) + + def _make_json_error_response(self, err): + """Return a JSON-encoded response object representing the error. + + :param err: OAuth error message. + :type err: str + :rtype: requests.Response + """ + return self._make_json_response({"error": err}, status_code=400) + + def _invalid_redirect_uri_response(self): + """What to return when the redirect_uri parameter is missing. + + :rtype: requests.Response + """ + return self._make_json_error_response("invalid_request") + + +class AuthorizationProvider(Provider): + """OAuth 2.0 authorization provider. This class manages authorization + codes and access tokens. Certain methods MUST be overridden in a + subclass, thus this class cannot be directly used as a provider. + + These are the methods that must be implemented in a subclass: + + validate_client_id(self, client_id) + # Return True or False + + validate_client_secret(self, client_id, client_secret) + # Return True or False + + validate_scope(self, client_id, scope) + # Return True or False + + validate_redirect_uri(self, client_id, redirect_uri) + # Return True or False + + validate_access(self) # Use this to validate your app session user + # Return True or False + + from_authorization_code(self, client_id, code, scope) + # Return mixed data or None on invalid + + from_refresh_token(self, client_id, refresh_token, scope) + # Return mixed data or None on invalid + + persist_authorization_code(self, client_id, code, scope) + # Return value ignored + + persist_token_information(self, client_id, scope, access_token, + token_type, expires_in, refresh_token, + data) + # Return value ignored + + discard_authorization_code(self, client_id, code) + # Return value ignored + + discard_refresh_token(self, client_id, refresh_token) + # Return value ignored + + Optionally, the following may be overridden to achieve desired behavior: + + @property + token_length(self) + + @property + token_type(self) + + @property + token_expires_in(self) + + generate_authorization_code(self) + + generate_access_token(self) + + generate_refresh_token(self) + + """ + + @property + def token_length(self): + """Property method to get the length used to generate tokens. + + :rtype: int + """ + return 40 + + @property + def token_type(self): + """Property method to get the access token type. + + :rtype: str + """ + return "Bearer" + + @property + def token_expires_in(self): + """Property method to get the token expiration time in seconds. + + :rtype: int + """ + return 3600 + + def generate_authorization_code(self): + """Generate a random authorization code. + + :rtype: str + """ + return utils.random_ascii_string(self.token_length) + + def generate_access_token(self): + """Generate a random access token. + + :rtype: str + """ + return utils.random_ascii_string(self.token_length) + + def generate_refresh_token(self): + """Generate a random refresh token. + + :rtype: str + """ + return utils.random_ascii_string(self.token_length) + + def get_authorization_code(self, response_type, client_id, redirect_uri, **params): + """Generate authorization code HTTP response. + + :param response_type: Desired response type. Must be exactly "code". + :type response_type: str + :param client_id: Client ID. + :type client_id: str + :param redirect_uri: Client redirect URI. + :type redirect_uri: str + :rtype: requests.Response + """ + + # Ensure proper response_type + if response_type != "code": + err = "unsupported_response_type" + return self._make_redirect_error_response(redirect_uri, err) + + # Check redirect URI + is_valid_redirect_uri = self.validate_redirect_uri(client_id, redirect_uri) + if not is_valid_redirect_uri: + return self._invalid_redirect_uri_response() + + # Check conditions + is_valid_client_id = self.validate_client_id(client_id) + is_valid_access = self.validate_access() + scope = params.get("scope", "") + is_valid_scope = self.validate_scope(client_id, scope) + + # Return proper error responses on invalid conditions + if not is_valid_client_id: + err = "unauthorized_client" + return self._make_redirect_error_response(redirect_uri, err) + + if not is_valid_access: + err = "access_denied" + return self._make_redirect_error_response(redirect_uri, err) + + if not is_valid_scope: + err = "invalid_scope" + return self._make_redirect_error_response(redirect_uri, err) + + # Generate authorization code + code = self.generate_authorization_code() + + # Save information to be used to validate later requests + self.persist_authorization_code(client_id=client_id, code=code, scope=scope) + + # Return redirection response + params.update( + { + "code": code, + "response_type": None, + "client_id": None, + "redirect_uri": None, + } + ) + redirect = utils.build_url(redirect_uri, params) + return self._make_response(headers={"Location": redirect}, status_code=302) + + def refresh_token( + self, grant_type, client_id, client_secret, refresh_token, **params + ): + """Generate access token HTTP response from a refresh token. + + :param grant_type: Desired grant type. Must be "refresh_token". + :type grant_type: str + :param client_id: Client ID. + :type client_id: str + :param client_secret: Client secret. + :type client_secret: str + :param refresh_token: Refresh token. + :type refresh_token: str + :rtype: requests.Response + """ + + # Ensure proper grant_type + if grant_type != "refresh_token": + return self._make_json_error_response("unsupported_grant_type") + + # Check conditions + is_valid_client_id = self.validate_client_id(client_id) + is_valid_client_secret = self.validate_client_secret(client_id, client_secret) + scope = params.get("scope", "") + is_valid_scope = self.validate_scope(client_id, scope) + data = self.from_refresh_token(client_id, refresh_token, scope) + is_valid_refresh_token = data is not None + + # Return proper error responses on invalid conditions + if not (is_valid_client_id and is_valid_client_secret): + return self._make_json_error_response("invalid_client") + + if not is_valid_scope: + return self._make_json_error_response("invalid_scope") + + if not is_valid_refresh_token: + return self._make_json_error_response("invalid_grant") + + # Discard original refresh token + self.discard_refresh_token(client_id, refresh_token) + + # Generate access tokens once all conditions have been met + access_token = self.generate_access_token() + token_type = self.token_type + expires_in = self.token_expires_in + refresh_token = self.generate_refresh_token() + + # Save information to be used to validate later requests + self.persist_token_information( + client_id=client_id, + scope=scope, + access_token=access_token, + token_type=token_type, + expires_in=expires_in, + refresh_token=refresh_token, + data=data, + ) + + # Return json response + return self._make_json_response( + { + "access_token": access_token, + "token_type": token_type, + "expires_in": expires_in, + "refresh_token": refresh_token, + } + ) + + def get_token( + self, grant_type, client_id, client_secret, redirect_uri, code, **params + ): + """Generate access token HTTP response. + + :param grant_type: Desired grant type. Must be "authorization_code". + :type grant_type: str + :param client_id: Client ID. + :type client_id: str + :param client_secret: Client secret. + :type client_secret: str + :param redirect_uri: Client redirect URI. + :type redirect_uri: str + :param code: Authorization code. + :type code: str + :rtype: requests.Response + """ + + # Ensure proper grant_type + if grant_type != "authorization_code": + return self._make_json_error_response("unsupported_grant_type") + + # Check conditions + is_valid_client_id = self.validate_client_id(client_id) + is_valid_client_secret = self.validate_client_secret(client_id, client_secret) + is_valid_redirect_uri = self.validate_redirect_uri(client_id, redirect_uri) + + scope = params.get("scope", "") + is_valid_scope = self.validate_scope(client_id, scope) + data = self.from_authorization_code(client_id, code, scope) + is_valid_grant = data is not None + + # Return proper error responses on invalid conditions + if not (is_valid_client_id and is_valid_client_secret): + return self._make_json_error_response("invalid_client") + + if not is_valid_grant or not is_valid_redirect_uri: + return self._make_json_error_response("invalid_grant") + + if not is_valid_scope: + return self._make_json_error_response("invalid_scope") + + # Discard original authorization code + self.discard_authorization_code(client_id, code) + + # Generate access tokens once all conditions have been met + access_token = self.generate_access_token() + token_type = self.token_type + expires_in = self.token_expires_in + refresh_token = self.generate_refresh_token() + + # Save information to be used to validate later requests + self.persist_token_information( + client_id=client_id, + scope=scope, + access_token=access_token, + token_type=token_type, + expires_in=expires_in, + refresh_token=refresh_token, + data=data, + ) + + # Return json response + return self._make_json_response( + { + "access_token": access_token, + "token_type": token_type, + "expires_in": expires_in, + "refresh_token": refresh_token, + } + ) + + def get_authorization_code_from_uri(self, uri): + """Get authorization code response from a URI. This method will + ignore the domain and path of the request, instead + automatically parsing the query string parameters. + + :param uri: URI to parse for authorization information. + :type uri: str + :rtype: requests.Response + """ + params = utils.url_query_params(uri) + try: + if "response_type" not in params: + raise TypeError("Missing parameter response_type in URL query") + + if "client_id" not in params: + raise TypeError("Missing parameter client_id in URL query") + + if "redirect_uri" not in params: + raise TypeError("Missing parameter redirect_uri in URL query") + + return self.get_authorization_code(**params) + except TypeError as exc: + self._handle_exception(exc) + + # Catch missing parameters in request + err = "invalid_request" + if "redirect_uri" in params: + u = params["redirect_uri"] + return self._make_redirect_error_response(u, err) + else: + return self._invalid_redirect_uri_response() + except Exception as exc: + self._handle_exception(exc) + + # Catch all other server errors + err = "server_error" + u = params["redirect_uri"] + return self._make_redirect_error_response(u, err) + + def get_token_from_post_data(self, data): + """Get a token response from POST data. + + :param data: POST data containing authorization information. + :type data: dict + :rtype: requests.Response + """ + try: + # Verify OAuth 2.0 Parameters + for x in ["grant_type", "client_id", "client_secret"]: + if not data.get(x): + raise TypeError( + "Missing required OAuth 2.0 POST param: {0}".format(x) + ) + + # Handle get token from refresh_token + if "refresh_token" in data: + return self.refresh_token(**data) + + # Handle get token from authorization code + for x in ["redirect_uri", "code"]: + if not data.get(x): + raise TypeError( + "Missing required OAuth 2.0 POST param: {0}".format(x) + ) + return self.get_token(**data) + except TypeError as exc: + self._handle_exception(exc) + + # Catch missing parameters in request + return self._make_json_error_response("invalid_request") + except Exception as exc: + self._handle_exception(exc) + + # Catch all other server errors + return self._make_json_error_response("server_error") + + def validate_client_id(self, client_id): + raise NotImplementedError("Subclasses must implement " "validate_client_id.") + + def validate_client_secret(self, client_id, client_secret): + raise NotImplementedError( + "Subclasses must implement " "validate_client_secret." + ) + + def validate_redirect_uri(self, client_id, redirect_uri): + raise NotImplementedError("Subclasses must implement " "validate_redirect_uri.") + + def validate_scope(self, client_id, scope): + raise NotImplementedError("Subclasses must implement " "validate_scope.") + + def validate_access(self): + raise NotImplementedError("Subclasses must implement " "validate_access.") + + def from_authorization_code(self, client_id, code, scope): + raise NotImplementedError( + "Subclasses must implement " "from_authorization_code." + ) + + def from_refresh_token(self, client_id, refresh_token, scope): + raise NotImplementedError("Subclasses must implement " "from_refresh_token.") + + def persist_authorization_code(self, client_id, code, scope): + raise NotImplementedError( + "Subclasses must implement " "persist_authorization_code." + ) + + def persist_token_information( + self, + client_id, + scope, + access_token, + token_type, + expires_in, + refresh_token, + data, + ): + raise NotImplementedError( + "Subclasses must implement " "persist_token_information." + ) + + def discard_authorization_code(self, client_id, code): + raise NotImplementedError( + "Subclasses must implement " "discard_authorization_code." + ) + + def discard_refresh_token(self, client_id, refresh_token): + raise NotImplementedError("Subclasses must implement " "discard_refresh_token.") + + +class OAuthError(Unauthorized): + """OAuth error, including the OAuth error reason.""" + + def __init__(self, reason, *args, **kwargs): + self.reason = reason + super(OAuthError, self).__init__(*args, **kwargs) + + +class ResourceAuthorization(object): + """A class containing an OAuth 2.0 authorization.""" + + is_oauth = False + is_valid = None + token = None + client_id = None + expires_in = None + error = None + + def raise_error_if_invalid(self): + if not self.is_valid: + raise OAuthError(self.error, "OAuth authorization error") + + +class ResourceProvider(Provider): + """OAuth 2.0 resource provider. This class provides an interface + to validate an incoming request and authenticate resource access. + Certain methods MUST be overridden in a subclass, thus this + class cannot be directly used as a resource provider. + + These are the methods that must be implemented in a subclass: + + get_authorization_header(self) + # Return header string for key "Authorization" or None + + validate_access_token(self, access_token, authorization) + # Set is_valid=True, client_id, and expires_in attributes + # on authorization if authorization was successful. + # Return value is ignored + """ + + @property + def authorization_class(self): + return ResourceAuthorization + + def get_authorization(self): + """Get authorization object representing status of authentication.""" + log.info("get_authorization") + auth = self.authorization_class() + header = self.get_authorization_header() + if not header or not header.split: + log.info("no header") + return auth + header = header.split() + if len(header) > 1 and header[0] == "Bearer": + auth.is_oauth = True + access_token = header[1] + self.validate_access_token(access_token, auth) + if not auth.is_valid: + auth.error = "access_denied" + return auth + + def get_authorization_header(self): + raise NotImplementedError( + "Subclasses must implement " "get_authorization_header." + ) + + def validate_access_token(self, access_token, authorization): + raise NotImplementedError("Subclasses must implement " "validate_token.") diff --git a/src/fts3rest/fts3rest/lib/oauth2lib/utils.py b/src/fts3rest/fts3rest/lib/oauth2lib/utils.py new file mode 100644 index 00000000..44a6af75 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/oauth2lib/utils.py @@ -0,0 +1,64 @@ +import string +from urllib.parse import urlparse, urlencode, urlunparse, parse_qsl +from random import SystemRandom + +UNICODE_ASCII_CHARACTERS = string.ascii_letters.decode("ascii") + string.digits.decode( + "ascii" +) + + +def random_ascii_string(length): + random = SystemRandom() + return "".join([random.choice(UNICODE_ASCII_CHARACTERS) for x in range(length)]) + + +def url_query_params(url): + """Return query parameters as a dict from the specified URL. + + :param url: URL. + :type url: str + :rtype: dict + """ + return dict(parse_qsl(urlparse(url).query, True)) + + +def url_dequery(url): + """Return a URL with the query component removed. + + :param url: URL to dequery. + :type url: str + :rtype: str + """ + url = urlparse(url) + return urlunparse((url.scheme, url.netloc, url.path, url.params, "", url.fragment)) + + +def build_url(base, additional_params=None): + """Construct a URL based off of base containing all parameters in + the query portion of base plus any additional parameters. + + :param base: Base URL + :type base: str + ::param additional_params: Additional query parameters to include. + :type additional_params: dict + :rtype: str + """ + url = urlparse(base) + query_params = {} + query_params.update(parse_qsl(url.query, True)) + if additional_params is not None: + query_params.update(additional_params) + for k, v in additional_params.items(): + if v is None: + query_params.pop(k) + + return urlunparse( + ( + url.scheme, + url.netloc, + url.path, + url.params, + urlencode(query_params), + url.fragment, + ) + ) diff --git a/src/fts3rest/fts3rest/lib/oauth2provider.py b/src/fts3rest/fts3rest/lib/oauth2provider.py new file mode 100644 index 00000000..cd554ea0 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/oauth2provider.py @@ -0,0 +1,360 @@ +# Copyright 2014-2020 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from datetime import datetime, timedelta +import json +import jwt +from fts3rest.model.meta import Session +from fts3rest.lib.middleware.fts3auth.constants import VALID_OPERATIONS +from fts3rest.lib.middleware.fts3auth.credentials import generate_delegation_id +from fts3rest.lib.oauth2lib.provider import ( + AuthorizationProvider, + ResourceAuthorization, + ResourceProvider, +) +from fts3rest.lib.openidconnect import oidc_manager +from jwcrypto.jwk import JWK +from flask import request + +from fts3.model.credentials import Credential, CredentialCache +from fts3.model.oauth2 import OAuth2Application, OAuth2Code, OAuth2Token + +log = logging.getLogger(__name__) + + +class FTS3OAuth2AuthorizationProvider(AuthorizationProvider): + """ + OAuth2 Authorization provider, specific methods + """ + + def validate_client_id(self, client_id): + app = Session.query(OAuth2Application).get(client_id) + return app is not None + + def validate_client_secret(self, client_id, client_secret): + app = Session.query(OAuth2Application).get(client_id) + if not app: + return False + return app.client_secret == client_secret + + def validate_scope(self, client_id, scope): + app = Session.query(OAuth2Application).get(client_id) + for s in scope: + if s not in VALID_OPERATIONS or s not in app.scope: + return False + return True + + def validate_redirect_uri(self, client_id, redirect_uri): + app = Session.query(OAuth2Application).get(client_id) + if not app: + return False + return redirect_uri in app.redirect_to.split() + + def validate_access(self): + user = request.environ["fts3.User.Credentials"] + return user is not None + + def from_authorization_code(self, client_id, code, scope): + code = Session.query(OAuth2Code).get(code) + if not code: + return None + return {"dlg_id": code.dlg_id} + + def from_refresh_token(self, client_id, refresh_token, scope): + code = Session.query(OAuth2Token).get((client_id, refresh_token)) + if not code: + return None + return {"dlg_id": code.dlg_id} + + def _insert_user(self, user): + # We will need the user in t_credential_cache at least! + cred = ( + Session.query(CredentialCache) + .filter(CredentialCache.dlg_id == user.delegation_id) + .first() + ) + if not cred: + cred = CredentialCache( + dlg_id=user.delegation_id, + dn=user.user_dn, + cert_request=None, + priv_key=None, + voms_attrs="\n".join(user.voms_cred), + ) + Session.merge(cred) + + def persist_authorization_code(self, client_id, code, scope): + user = request.environ["fts3.User.Credentials"] + self._insert_user(user) + # Remove previous codes + Session.query(OAuth2Code).filter( + (OAuth2Code.client_id == client_id) + & (OAuth2Code.dlg_id == user.delegation_id) + ).delete() + # Token + code = OAuth2Code( + client_id=client_id, code=code, scope=scope, dlg_id=user.delegation_id + ) + Session.merge(code) + Session.commit() + + def is_already_authorized(self, dlg_id, client_id, scope): + code = Session.query(OAuth2Token).filter( + (OAuth2Token.client_id == client_id) & (OAuth2Token.dlg_id == dlg_id) + ) + if scope: + code = code.filter(OAuth2Token.scope == scope) + code = code.all() + if len(code) > 0: + return True + else: + return None + + def persist_token_information( + self, + client_id, + scope, + access_token, + token_type, + expires_in, + refresh_token, + data, + ): + # Remove previous tokens + Session.query(OAuth2Token).filter( + (OAuth2Token.dlg_id == data["dlg_id"]) + & (OAuth2Token.client_id == client_id) + ).delete() + # Add new + token = OAuth2Token( + client_id=client_id, + scope=scope, + access_token=access_token, + token_type=token_type, + expires=datetime.utcnow() + timedelta(seconds=expires_in), + refresh_token=refresh_token, + dlg_id=data["dlg_id"], + ) + Session.merge(token) + Session.commit() + + def discard_authorization_code(self, client_id, code): + auth_code = Session.query(OAuth2Code).get(code) + if auth_code is not None: + Session.delete(auth_code) + Session.commit() + + def discard_refresh_token(self, client_id, refresh_token): + token = Session.query(OAuth2Token).get((client_id, refresh_token)) + if token is not None: + Session.delete(token) + Session.commit() + + +class FTS3ResourceAuthorization(ResourceAuthorization): + dlg_id = None + credentials = None + scope = None + + +class FTS3OAuth2ResourceProvider(ResourceProvider): + """ + OAuth2 resource provider + """ + + def __init__(self, environ, config): + self.environ = environ + self.config = config + + @property + def authorization_class(self): + return FTS3ResourceAuthorization + + def get_authorization_header(self): + return self.environ.get("HTTP_AUTHORIZATION", None) + + def validate_access_token(self, access_token, authorization): + """ + Validate access token offline or online + + Description of the algorithm: + - Validate access token offline (using cached keys) or online (using introspection RFC 7662). + - If a credential already exists in the DB and has not expired, the new token is discarded and the old + credential is used. + - If a credential already exists in the DB but has expired, delete it. + - If there's no credential, Instrospect the token to get additional information (if not done before). Then, + exchange the access token with a refresh token. Store both tokens in the DB. + + :param access_token: + :param authorization: attribute .is_valid is set to True if validation successful + """ + authorization.is_valid = False + + if self._should_validate_offline(): + valid, credential = self._validate_token_offline(access_token) + else: + valid, credential = self._validate_token_online(access_token) + if not valid: + log.warning("Access token provided is not valid") + return + + # Check if a credential already exists in the DB + credential_db = ( + Session.query(Credential).filter(Credential.dn == credential["sub"]).first() + ) + if credential_db and credential_db.expired(): + log.debug("credential_db_has_expired") + Session.delete(credential_db) + Session.commit() + credential_db = None + + if not credential_db: + if self._should_validate_offline(): + log.debug("offline and not in db") + # Introspect to obtain additional information + valid, credential = self._validate_token_online(access_token) + if not valid: + log.debug("Access token provided is not valid") + return + # Store credential in DB + log.debug("Store credential in DB") + dlg_id = generate_delegation_id(credential["sub"], "") + try: + if "wlcg" in credential["iss"]: + # Hardcoded scope and audience for wlcg tokens. To change once the wlcg standard evolves + scope = "offline_access openid storage.read:/ storage.modify:/" + audience = "https://wlcg.cern.ch/jwt/v1/any" + ( + access_token, + refresh_token, + ) = oidc_manager.generate_token_with_scope( + credential["iss"], access_token, scope, audience + ) + else: + refresh_token = oidc_manager.generate_refresh_token( + credential["iss"], access_token + ) + except Exception: + return + credential_db = self._save_credential( + dlg_id, + credential["sub"], + str(access_token) + ":" + str(refresh_token), + self._generate_voms_attrs(credential), + datetime.utcfromtimestamp(credential["exp"]), + ) + + authorization.is_oauth = True + authorization.token = credential_db.proxy.split(":")[0] + authorization.dlg_id = credential_db.dlg_id + authorization.expires_in = credential_db.termination_time - datetime.utcnow() + if authorization.expires_in > timedelta(seconds=0): + authorization.credentials = self._get_credentials(credential_db.dlg_id) + if authorization.credentials: + authorization.is_valid = True + + def _get_credentials(self, dlg_id): + """ + Get the user credentials bound to the authorization token + """ + return Session.query(Credential).filter(Credential.dlg_id == dlg_id).first() + + def _generate_voms_attrs(self, credential): + if "email" in credential: + if "username" in credential: + # 'username' is never there whether offline or online + return credential["email"] + " " + credential["username"] + else: + # 'user_id' is there only online + return credential["email"] + " " + credential["user_id"] + else: + if "username" in credential: + return credential["username"] + " " + else: + return credential["user_id"] + " " + + def _validate_token_offline(self, access_token): + """ + Validate access token using cached information from the provider + :param access_token: + :return: tuple(valid, credential) or tuple(False, None) + """ + log.debug("entered validate_token_offline") + try: + unverified_payload = jwt.decode(access_token, verify=False) + unverified_header = jwt.get_unverified_header(access_token) + issuer = unverified_payload["iss"] + key_id = unverified_header["kid"] + log.debug("issuer={}, key_id={}".format(issuer, key_id)) + + algorithm = unverified_header.get("alg", "RS256") + log.debug("alg={}".format(algorithm)) + + pub_key = oidc_manager.get_provider_key(issuer, key_id) + log.debug("key={}".format(pub_key)) + pub_key = JWK.from_json(json.dumps(pub_key.to_dict())) + log.debug("pubkey={}".format(pub_key)) + # Verify & Validate + if "wlcg" in issuer: + audience = "https://wlcg.cern.ch/jwt/v1/any" + credential = jwt.decode( + access_token, + pub_key.export_to_pem(), + algorithms=[algorithm], + audience=[audience], + ) + else: + credential = jwt.decode( + access_token, pub_key.export_to_pem(), algorithms=[algorithm] + ) + log.debug("offline_response::: {}".format(credential)) + except Exception as ex: + log.debug(ex) + log.debug("return False (exception)") + return False, None + log.debug("return True, credential") + return True, credential + + def _validate_token_online(self, access_token): + """ + Validate access token using Introspection (RFC 7662) + :param access_token: + :return: tuple(valid, credential) or tuple(False, None) + """ + try: + unverified_payload = jwt.decode(access_token, verify=False) + issuer = unverified_payload["iss"] + log.debug("issuer={}".format(issuer)) + response = oidc_manager.introspect(issuer, access_token) + log.debug("online_response::: {}".format(response)) + return response["active"], response + except Exception as ex: + log.debug("exception {}".format(ex)) + return False, None + + def _save_credential(self, dlg_id, dn, proxy, voms_attrs, termination_time): + credential = Credential( + dlg_id=dlg_id, + dn=dn, + proxy=proxy, + voms_attrs=voms_attrs, + termination_time=termination_time, + ) + Session.add(credential) + Session.commit() + return credential + + def _should_validate_offline(self): + return "fts3.ValidateAccessTokenOffline" in self.config.keys() diff --git a/src/fts3rest/fts3rest/lib/openidconnect.py b/src/fts3rest/fts3rest/lib/openidconnect.py new file mode 100644 index 00000000..2a48bcf1 --- /dev/null +++ b/src/fts3rest/fts3rest/lib/openidconnect.py @@ -0,0 +1,220 @@ +import logging +from datetime import datetime + +import jwt +from oic import rndstr +from oic.extension.message import TokenIntrospectionRequest, TokenIntrospectionResponse +from oic.oic import Client, Grant, Token +from oic.oic.message import AccessTokenResponse, Message, RegistrationResponse +from oic.utils import time_util +from oic.utils.authn.client import CLIENT_AUTHN_METHOD + +log = logging.getLogger(__name__) + + +class OIDCmanager: + """ + Class that interfaces with PyOIDC + + It is supposed to have a unique instance which provides all operations that require + information from the OIDC issuers. + """ + + def __init__(self): + self.clients = {} + self.config = None + + def setup(self, config): + self.config = config + self._configure_clients(config["fts3.Providers"]) + self._set_keys_cache_time(int(config["fts3.JWKCacheSeconds"])) + self._retrieve_clients_keys() + + def _configure_clients(self, providers_config): + # log.debug('provider_info::: {}'.format(client.provider_info)) + for provider in providers_config: + client = Client(client_authn_method=CLIENT_AUTHN_METHOD) + # Retrieve well-known configuration + client.provider_config(provider) + # Register + client_reg = RegistrationResponse( + client_id=providers_config[provider]["client_id"], + client_secret=providers_config[provider]["client_secret"], + ) + client.store_registration_info(client_reg) + self.clients[provider] = client + + def _retrieve_clients_keys(self): + for provider in self.clients: + client = self.clients[provider] + client.keyjar.get_issuer_keys(provider) + + def _set_keys_cache_time(self, cache_time): + for provider in self.clients: + client = self.clients[provider] + keybundles = client.keyjar.issuer_keys[provider] + for keybundle in keybundles: + keybundle.cache_time = cache_time + + def get_provider_key(self, issuer, kid): + """ + Get a Provider Key by ID + :param issuer: provider + :param kid: Key ID + :return: key + :raise ValueError: if key not found + """ + client = self.clients[issuer] + keys = client.keyjar.get_issuer_keys(issuer) # List of Keys (from pyjwkest) + for key in keys: + if key.kid == kid: + return key + raise ValueError("Key with kid {} not found".format(kid)) + + def introspect(self, issuer, access_token): + """ + Make a Token Introspection request + :param issuer: issuer of the token + :param access_token: token to introspect + :return: JSON response + """ + client = self.clients[issuer] + response = client.do_any( + request_args={"token": access_token}, + request=TokenIntrospectionRequest, + response=TokenIntrospectionResponse, + body_type="json", + method="POST", + authn_method="client_secret_basic", + ) + log.debug("introspect_response::: {}".format(response)) + return response + + def generate_refresh_token(self, issuer, access_token): + """ + Exchange an access token for a refresh token + :param issuer: issuer of the access token + :param access_token: + :return: refresh token + :raise Exception: If refresh token cannot be obtained + """ + log.debug("enter generate_refresh_token") + client = self.clients[issuer] + body = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "subject_token": access_token, + "scope": "offline_access openid profile", + "audience": client.client_id, + } + try: + response = client.do_any( + Message, + request_args=body, + endpoint=client.provider_info["token_endpoint"], + body_type="json", + method="POST", + authn_method="client_secret_basic", + ) + log.debug("after do any") + response = response.json() + log.debug("response: {}".format(response)) + refresh_token = response["refresh_token"] + log.debug("REFRESH TOKEN IS {}".format(refresh_token)) + except Exception as ex: + log.warning("Exception raised when requesting refresh token") + log.warning(ex) + raise ex + log.debug("refresh_token_response::: {}".format(refresh_token)) + return refresh_token + + def request_token_exchange(self, issuer, access_token, scope=None, audience=None): + """ + Do a token exchange request + :param issuer: issuer of the access token + :param access_token: token to exchange + :param scope: string containing scopes separated by space + :return: provider response in json + :raise Exception: if request fails + """ + client = self.clients[issuer] + body = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "subject_token": access_token, + } + if scope: + body["scope"] = scope + if audience: + body["audience"] = audience + + try: + response = client.do_any( + Message, + request_args=body, + endpoint=client.provider_info["token_endpoint"], + body_type="json", + method="POST", + authn_method="client_secret_basic", + ) + response = response.json() + log.debug("response: {}".format(response)) + except Exception as ex: + log.warning("Exception raised when exchanging token") + log.warning(ex) + raise ex + return response + + def generate_token_with_scope(self, issuer, access_token, scope, audience=None): + """ + Exchange an access token for another access token with the specified scope + :param issuer: issuer of the access token + :param access_token: + :param scope: string containing scopes separated by space + :return: new access token and optional refresh_token + :raise Exception: If token cannot be obtained + """ + response = self.request_token_exchange( + issuer, access_token, scope=scope, audience=audience + ) + access_token = response["access_token"] + refresh_token = response.get("access_token", None) + return access_token, refresh_token + + def refresh_access_token(self, credential): + """ + Request new access token + :param credential: Credential from DB containing an access token and a refresh token + :return: Updated credential containing new access token + """ + access_token, refresh_token = credential.proxy.split(":") + unverified_payload = jwt.decode(access_token, verify=False) + issuer = unverified_payload["iss"] + client = self.clients[issuer] + log.debug("refresh_access_token for {}".format(issuer)) + + # Prepare and make request + refresh_session_state = rndstr(50) + client.grant[refresh_session_state] = Grant() + client.grant[refresh_session_state].grant_expiration_time = ( + time_util.utc_time_sans_frac() + 60 + ) + resp = AccessTokenResponse() + resp["refresh_token"] = refresh_token + client.grant[refresh_session_state].tokens.append(Token(resp)) + new_credential = client.do_access_token_refresh( + authn_method="client_secret_basic", state=refresh_session_state + ) + # A new refresh token is optional + refresh_token = new_credential.get("refresh_token", refresh_token) + access_token = new_credential.get("access_token") + unverified_payload = jwt.decode(access_token, verify=False) + expiration_time = unverified_payload["exp"] + credential.proxy = new_credential["access_token"] + ":" + refresh_token + credential.termination_time = datetime.utcfromtimestamp(expiration_time) + + return credential + + +# Should be the only instance, called during the middleware initialization +oidc_manager = OIDCmanager() -- GitLab From 1464ca31502edf088209b449671e899ab8dce094 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Thu, 19 Mar 2020 17:27:14 +0100 Subject: [PATCH 2/5] Create FTSTestClient to adapt Pylons tests --- src/fts3rest/fts3rest/lib/JobBuilder_utils.py | 4 +- .../lib/middleware/fts3auth/credentials.py | 2 +- src/fts3rest/fts3rest/tests/__init__.py | 221 ++++ src/fts3rest/fts3rest/tests/ftstestclient.py | 39 + .../tests/functional/test_job_submission.py | 1157 +++++++++++++++++ 5 files changed, 1420 insertions(+), 3 deletions(-) create mode 100644 src/fts3rest/fts3rest/tests/__init__.py create mode 100644 src/fts3rest/fts3rest/tests/ftstestclient.py create mode 100644 src/fts3rest/fts3rest/tests/functional/test_job_submission.py diff --git a/src/fts3rest/fts3rest/lib/JobBuilder_utils.py b/src/fts3rest/fts3rest/lib/JobBuilder_utils.py index a8cffe01..ccb8c379 100644 --- a/src/fts3rest/fts3rest/lib/JobBuilder_utils.py +++ b/src/fts3rest/fts3rest/lib/JobBuilder_utils.py @@ -6,7 +6,7 @@ reduce the complexity of the module. import logging import random import uuid -from flask import request +from flask import current_app as app from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError from fts3.model import BannedSE @@ -66,7 +66,7 @@ def is_dest_surl_uuid_enabled(vo_name): Args: vo_name: Name of the vo """ - list_of_vos = request.config.get("fts3.CheckDuplicates", "None") + list_of_vos = app.config.get("fts3.CheckDuplicates", "None") if not list_of_vos: return False if vo_name in list_of_vos or "*" in list_of_vos: diff --git a/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py index f45c410c..b0805d99 100644 --- a/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py +++ b/src/fts3rest/fts3rest/lib/middleware/fts3auth/credentials.py @@ -92,7 +92,7 @@ class InvalidCredentials(Exception): pass -class UserCredentials(object): +class UserCredentials: """ Handles the user credentials and privileges """ diff --git a/src/fts3rest/fts3rest/tests/__init__.py b/src/fts3rest/fts3rest/tests/__init__.py new file mode 100644 index 00000000..d366d7b4 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/__init__.py @@ -0,0 +1,221 @@ +import os +import shutil +import time + +from datetime import datetime, timedelta +from unittest import TestCase +from M2Crypto import ASN1, X509, RSA, EVP +from M2Crypto.ASN1 import UTC + + +from fts3rest.lib.middleware.fts3auth.credentials import UserCredentials +from fts3rest.model.meta import Session +from fts3.model import Credential, CredentialCache, DataManagement +from fts3.model import Job, File, FileRetryLog, ServerConfig +from fts3rest.config.middleware import create_app +from .ftstestclient import FTSTestClient + + +environ = {} + + +def _generate_mock_cert(): + rsa_key = RSA.gen_key(512, 65537) + pkey = EVP.PKey() + pkey.assign_rsa(rsa_key) + + cert = X509.X509() + cert.set_pubkey(pkey) + not_before = ASN1.ASN1_UTCTIME() + not_before.set_datetime(datetime.now(UTC)) + not_after = ASN1.ASN1_UTCTIME() + not_after.set_datetime(datetime.now(UTC) + timedelta(hours=24)) + cert.set_not_before(not_before) + cert.set_not_after(not_after) + cert.sign(pkey, "md5") + + return pkey, cert + + +def _app_post_json(self, url, params, **kwargs): + """ + To be injected into TestApp if it doesn't have an post_json method available + """ + from json import dumps + + params = dumps(params) + kwargs["content_type"] = "application/json" + return self.post(url, params=params, **kwargs) + + +def _app_get_json(self, url, *args, **kwargs): + """ + Add get_json to TestApp for convenience + """ + headers = kwargs.pop("headers", dict()) + headers["Accept"] = "application/json" + kwargs["headers"] = headers + return self.get(url, *args, **kwargs) + + +class TestController(TestCase): + """ + Base class for the tests + """ + + TEST_USER_DN = "/DC=ch/DC=cern/CN=Test User" + + def setUp(self): + self.pkey, self.cert = _generate_mock_cert() + test_config_default = "./fts3testconfig" + self.flask_app = create_app(test_config_default) + self.flask_app.testing = True + self.flask_app.test_client_class = FTSTestClient + self.app = self.flask_app.test_client() + # todo adapt extra environ + + def setup_gridsite_environment(self, no_vo=False, dn=None): + """ + Add to the test environment mock values of the variables + set by mod_gridsite. + + Args: + noVo: If True, no VO attributes will be set + dn: Override default user DN + """ + if dn is None: + dn = TestController.TEST_USER_DN + self.flask_app.extra_environ["GRST_CRED_AURI_0"] = "dn:" + dn + + if not no_vo: + self.flask_app.extra_environ.update( + { + "GRST_CRED_AURI_1": "fqan:/testvo/Role=NULL/Capability=NULL", + "GRST_CRED_AURI_2": "fqan:/testvo/Role=myrole/Capability=NULL", + "GRST_CRED_AURI_3": "fqan:/testvo/Role=lcgadmin/Capability=NULL", + } + ) + else: + for grst in ["GRST_CRED_AURI_1", "GRST_CRED_AURI_2", "GRST_CRED_AURI_3"]: + if grst in self.flask_app.extra_environ: + del self.flask_app.extra_environ[grst] + + def get_user_credentials(self): + """ + Get the user credentials from the environment + """ + return UserCredentials(self.flask_app.extra_environ, {"public": {"*": "all"}}) + + def push_delegation(self, lifetime=timedelta(hours=7)): + """ + Push into the database a mock delegated credential + + Args: + lifetime: The mock credential lifetime + """ + creds = self.get_user_credentials() + delegated = Credential() + delegated.dlg_id = creds.delegation_id + delegated.dn = creds.user_dn + delegated.proxy = "-NOT USED-" + delegated.voms_attrs = None + delegated.termination_time = datetime.utcnow() + lifetime + + Session.merge(delegated) + Session.commit() + + def pop_delegation(self): + """ + Remove the mock proxy from the database + """ + cred = self.get_user_credentials() + if cred and cred.delegation_id: + delegated = Session.query(Credential).get( + (cred.delegation_id, cred.user_dn) + ) + if delegated: + Session.delete(delegated) + Session.commit() + + def get_x509_proxy(self, request_pem, issuer=None, subject=None, private_key=None): + """ + Generate a X509 proxy based on the request + + Args: + requestPEM: The request PEM encoded + issuer: The issuer user + subject: The subject of the proxy. If None, issuer/CN=proxy will be used + + Returns: + A X509 proxy PEM encoded + """ + if issuer is None: + issuer = [("DC", "ch"), ("DC", "cern"), ("CN", "Test User")] + if subject is None: + subject = issuer + [("CN", "proxy")] + + x509_request = X509.load_request_string(str(request_pem)) + + not_before = ASN1.ASN1_UTCTIME() + not_before.set_datetime(datetime.now(UTC)) + not_after = ASN1.ASN1_UTCTIME() + not_after.set_datetime(datetime.now(UTC) + timedelta(hours=3)) + + issuer_subject = X509.X509_Name() + for c in issuer: + issuer_subject.add_entry_by_txt(c[0], 0x1000, c[1], -1, -1, 0) + + proxy_subject = X509.X509_Name() + for c in subject: + proxy_subject.add_entry_by_txt(c[0], 0x1000, c[1], -1, -1, 0) + + proxy = X509.X509() + proxy.set_version(2) + proxy.set_subject(proxy_subject) + proxy.set_serial_number(int(time.time())) + proxy.set_version(x509_request.get_version()) + proxy.set_issuer(issuer_subject) + proxy.set_pubkey(x509_request.get_pubkey()) + + proxy.set_not_after(not_after) + proxy.set_not_before(not_before) + + if not private_key: + proxy.sign(self.pkey, "sha1") + else: + proxy.sign(private_key, "sha1") + + return proxy.as_pem() + self.cert.as_pem() + + def get_real_x509_proxy(self): + """ + Get a real X509 proxy + + Returns: + The content of the file pointed by X509_USER_PROXY, + None otherwise + """ + proxy_path = os.environ.get("X509_USER_PROXY", None) + if not proxy_path: + return None + return open(proxy_path).read() + + def tearDown(self): + """ + Called by the test framework at the end of each test + """ + Session.query(Credential).delete() + Session.query(CredentialCache).delete() + Session.query(FileRetryLog).delete() + Session.query(File).delete() + Session.query(DataManagement).delete() + Session.query(Job).delete() + Session.query(ServerConfig).delete() + Session.commit() + + # Delete messages + if "fts3.MessagingDirectory" in app.config: + try: + shutil.rmtree(self.flask_app.config["fts3.MessagingDirectory"]) + except Exception: + pass diff --git a/src/fts3rest/fts3rest/tests/ftstestclient.py b/src/fts3rest/fts3rest/tests/ftstestclient.py new file mode 100644 index 00000000..84200417 --- /dev/null +++ b/src/fts3rest/fts3rest/tests/ftstestclient.py @@ -0,0 +1,39 @@ +from flask.testing import FlaskClient +import functools +from flask import Response +import json + + +class TestResponse(Response): + @property + def json(self): + return json.loads(self.data) + + +def _adapt_test(func): + @functools.wraps + def wrapper(*args, **kwargs): + path = kwargs.pop("url", "/") + expected_status = kwargs.pop("status", 200) + data = kwargs.pop("params", None) + res = func(*args, **kwargs, path=path, data=data) + assert res.status_code == expected_status + return res + + return wrapper + + +class FTSTestClient(FlaskClient): + """ + This Flask Test Client adapts the request methods so they can be used + with old functional tests created for Pylon's WebTest + """ + + get = _adapt_test(super().get) + post = _adapt_test(super().post) + put = _adapt_test(super().put) + delete = _adapt_test(super().delete) + + def __init__(self, *args, **kwargs): + kwargs["response_wrapper"] = TestResponse + super().__init__(*args, **kwargs) diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_submission.py b/src/fts3rest/fts3rest/tests/functional/test_job_submission.py new file mode 100644 index 00000000..6760f9ee --- /dev/null +++ b/src/fts3rest/fts3rest/tests/functional/test_job_submission.py @@ -0,0 +1,1157 @@ +import json +import mock +import socket +import time +from nose.plugins.skip import SkipTest +from sqlalchemy.orm import scoped_session, sessionmaker + +from fts3rest.tests import TestController +from fts3rest.lib.base import Session +from fts3.model import File, Job +import random + + +class TestJobSubmission(TestController): + """ + Tests job submission + """ + + def _validate_submitted(self, job, no_vo=False, dn=TestController.TEST_USER_DN): + self.assertNotEqual(job, None) + files = job.files + self.assertNotEqual(files, None) + + self.assertEqual(job.user_dn, dn) + if no_vo: + self.assertEqual(job.vo_name, "TestUser@cern.ch") + else: + self.assertEqual(job.vo_name, "testvo") + self.assertEqual(job.job_state, "SUBMITTED") + + self.assertEqual(job.source_se, "root://source.es") + self.assertEqual(job.dest_se, "root://dest.ch") + self.assertEqual(job.overwrite_flag, True) + self.assertEqual(job.verify_checksum, "b") + self.assertEqual(job.job_type, "N") + self.assertEqual(job.priority, 3) + self.assertIsNone(job.max_time_in_queue) + + self.assertEqual(len(files), 1) + self.assertEqual(files[0].file_state, "SUBMITTED") + self.assertEqual(files[0].source_surl, "root://source.es/file") + self.assertEqual(files[0].source_se, "root://source.es") + self.assertEqual(files[0].dest_se, "root://dest.ch") + self.assertEqual(files[0].file_index, 0) + self.assertEqual(files[0].selection_strategy, "orderly") + self.assertEqual(files[0].user_filesize, 1024) + self.assertEqual(files[0].checksum, "adler32:1234") + self.assertEqual(files[0].file_metadata["mykey"], "myvalue") + if no_vo: + self.assertEqual(files[0].vo_name, "TestUser@cern.ch") + else: + self.assertEqual(files[0].vo_name, "testvo") + + self.assertEquals(files[0].activity, "default") + + # Validate submitter + self.assertEqual(socket.getfqdn(), job.submit_host) + + def test_submit(self): + """ + Submit a valid job + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(job_id, 0) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_no_reuse(self): + """ + Submit a valid job no reuse + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": False}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(job_id, 0) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_no_reuse_N(self): + """ + Submit a valid job, using 'N' instead of False + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": "N"}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(job_id, 0) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return str(job_id) + + def test_submit_reuse(self): + """ + Submit a valid reuse job + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True, "reuse": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_type, "Y") + + return job_id + + def test_submit_Y(self): + """ + Submit a valid reuse job, using 'Y' instead of True + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": "Y", "verify_checksum": "Y", "reuse": "Y"}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.job_type, "Y") + + def test_submit_post(self): + """ + Submit a valid job using POST instead of PUT + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted(Session.query(Job).get(job_id)) + + return job_id + + def test_submit_with_port(self): + """ + Submit a valid job where the port is explicit in the url. + source_se and dest_se must exclude this port + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "srm://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["srm://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + db_job = Session.query(Job).get(job_id) + + self.assertEqual(db_job.source_se, "srm://source.es") + self.assertEqual(db_job.dest_se, "srm://dest.ch") + + self.assertEqual(db_job.files[0].source_se, "srm://source.es") + self.assertEqual(db_job.files[0].dest_se, "srm://dest.ch") + + return job_id + + def test_submit_only_query(self): + """ + Submit a valid job, without a path, but with a query in the url. + This is valid for some protocols (i.e. srm) + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["srm://source.es/?SFN=/path/"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "copy_pin_lifetime": 3600, + "bring_online": 60, + "verify_checksum": True, + }, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + db_job = Session.query(Job).get(job_id) + self.assertEqual(db_job.job_state, "STAGING") + self.assertEqual(db_job.files[0].file_state, "STAGING") + self.assertEqual(db_job.copy_pin_lifetime, 3600) + self.assertEqual(db_job.bring_online, 60) + + return job_id + + def test_null_checksum(self): + """ + Valid job, with checksum explicitly set to null + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": None, + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "ADLER32") + + return job_id + + def test_checksum_no_verify(self): + """ + Valid job, with checksum, but verify_checksum is not set + In the DB, it must end as 'r' (compatibility with FTS3 behaviour) + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "t") + + return job_id + + def test_verify_checksum_target(self): + """ + Valid job, verify checksum in destination. + In the DB, it must end as 'r' (compatibility with FTS3 behaviour) or destination + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "target"}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "t") + + return job_id + + def test_verify_checksum_source(self): + """ + Valid job, verify checksum in source. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "source"}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "s") + + return job_id + + def test_verify_checksum_both(self): + """ + Valid job, verify checksum in source. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "1234F", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "both"}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].checksum, "1234F") + self.assertEqual(job.verify_checksum, "b") + + return job_id + + def test_verify_checksum_none(self): + """ + Valid job, verify checksum none. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": "none"}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.verify_checksum, "n") + + return job_id + + def test_null_user_filesize(self): + """ + Valid job, with filesize explicitly set to null + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "filesize": None, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was committed to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].user_filesize, 0) + + return job_id + + def test_no_vo(self): + """ + Submit a valid job with no VO data in the credentials (could happen with plain SSL!) + The job must be accepted, but assigned to the user's 'virtual' vo. + """ + self.setup_gridsite_environment(no_vo=True) + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted(Session.query(Job).get(job_id), no_vo=True) + + def test_no_vo_proxy(self): + """ + Submit a valid job with no VO data in the credentials, but still being a proxy. + The job must be accepted, but assigned to the user's 'virtual' vo. + """ + proxy_dn = self.TEST_USER_DN + "/CN=proxy" + self.setup_gridsite_environment(no_vo=True, dn=proxy_dn) + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + self._validate_submitted( + Session.query(Job).get(job_id), no_vo=True, dn=proxy_dn + ) + + def test_retry(self): + """ + Submit with a specific retry value + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "verify_checksum": True, + "retry": 42, + "retry_delay": 123, + }, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self._validate_submitted(job) + self.assertEqual(job.retry, 42) + self.assertEqual(job.retry_delay, 123) + + def test_with_activity(self): + """ + Submit a job specifiying activities for the files + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file"], + "destinations": [dest_surl], + "activity": "my-activity", + }, + { + "sources": ["https://source.es/file2"], + "destinations": ["https://dest.ch/file2"], + "activity": "my-second-activity", + }, + ] + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self.assertEqual(job.files[0].activity, "my-activity") + self.assertEqual(job.files[1].activity, "my-second-activity") + + def test_surl_with_spaces(self): + """ + Submit a job where the surl has spaces at the beginning and at the end + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["root://source.es/file\n \r "], + "destinations": ["\r\n" + dest_surl + "\n\n \n"], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024.0, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.put( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + # Make sure it was commited to the DB + self.assertGreater(len(job_id), 0) + + job = Session.query(Job).get(job_id) + self._validate_submitted(job) + + def test_submit_different_protocols(self): + """ + Source and destination protocol mismatch + For REST <= 3.2.3, this used to be forbidden, but it was decided to allow it + https://its.cern.ch/jira/browse/FTS-97 + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(1, len(job.files)) + self.assertEqual("http://source.es:8446/file", job.files[0].source_surl) + self.assertEqual(dest_surl, job.files[0].dest_surl) + + def test_submit_with_cloud_cred(self): + """ + Submit a job specifying cloud credentials + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["dropbox://dropbox.com/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": {"overwrite": True, "verify_checksum": True}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(1, len(job.files)) + self.assertEqual("dropbox://dropbox.com/file", job.files[0].source_surl) + self.assertEqual(dest_surl, job.files[0].dest_surl) + + def test_submit_protocol_params(self): + """ + Submit a transfer specifying some protocol parameters + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + "selection_strategy": "orderly", + "checksum": "adler32:1234", + "filesize": 1024, + "metadata": {"mykey": "myvalue"}, + } + ], + "params": { + "overwrite": True, + "verify_checksum": True, + "timeout": 1234, + "nostreams": 42, + "buffer_size": 1025, + "strict_copy": True, + }, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertTrue(job.internal_job_params is not None) + params = job.internal_job_params.split(",") + self.assertIn("timeout:1234", params) + self.assertIn("nostreams:42", params) + self.assertIn("buffersize:1025", params) + self.assertIn("strict", params) + + def test_submit_with_priority(self): + """ + Submit a job specifying the priority + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"priority": 5,}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertEqual(job.priority, 5) + + def test_submit_max_time_in_queue(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": 8}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + # See FTS-311 + # max_time_in_queue was effectively ignored by FTS3 + # Since FTS-311, this field stores the timestamp when the job expires + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, time.time()) + self.assertLessEqual(job.max_time_in_queue, (8 * 60 * 60) + time.time()) + + def test_submit_max_time_in_queue_suffix(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + Use a suffix. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": "4s"}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, time.time()) + self.assertLessEqual(job.max_time_in_queue, 8 + time.time()) + + def test_submit_max_time_in_queue_suffix2(self): + """ + Submits a job specifying the maximum time it should stay in the queue. + Use a suffix. + """ + self.setup_gridsite_environment() + self.push_delegation() + dest_surl = "root://dest" + str(random.randint(0, 100)) + ".ch:8447/file" + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": [dest_surl], + } + ], + "params": {"max_time_in_queue": "2m"}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + job = Session.query(Job).get(job_id) + self.assertGreater(job.max_time_in_queue, time.time()) + self.assertLessEqual(job.max_time_in_queue, 120 + time.time()) + + def test_submit_ipv4(self): + """ + Submit a job with IPv4 only + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv4.ch:8447/file"], + } + ], + "params": {"ipv4": True}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertIn("ipv4", jobdb.internal_job_params) + self.assertNotIn("ipv6", jobdb.internal_job_params) + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv4tofalse.ch:8447/file"], + } + ], + "params": {"ipv4": False}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertTrue( + jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params + ) + self.assertTrue( + jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params + ) + + def test_submit_ipv6(self): + """ + Submit a job with IPv6 only + """ + self.setup_gridsite_environment() + self.push_delegation() + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv6.ch:8447/file"], + } + ], + "params": {"ipv6": True}, + } + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertIn("ipv6", jobdb.internal_job_params) + self.assertNotIn("ipv4", jobdb.internal_job_params) + + job = { + "files": [ + { + "sources": ["http://source.es:8446/file"], + "destinations": ["root://destipv6tofalse.ch:8447/file"], + } + ], + "params": {"ipv6": False}, + } + + job_id = self.flask_app.post( + url="/jobs", + content_type="application/json", + params=json.dumps(job), + status=200, + ).json["job_id"] + + jobdb = Session.query(Job).get(job_id) + self.assertTrue( + jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params + ) + self.assertTrue( + jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params + ) + + def test_files_balanced(self): + """ + Checks the distribution of the file 'hashed ids' is reasonably uniformely distributed. + hashed_id is a legacy name, its purpose is balance the transfers between hosts + regardless of the number running in a giving moment + """ + raise SkipTest("Disabled as it is not very reliable") + self.setup_gridsite_environment() + self.push_delegation() + + files = [] + for r in xrange(5000): + files.append( + { + "sources": ["root://source.es/file%d" % r], + "destinations": [ + "root://dest.ch/file%d%d" % (r, random.randint(0, 100)) + ], + } + ) + + job = {"files": files} + + job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ + "job_id" + ] + + files = Session.query(File.hashed_id).filter(File.job_id == job_id) + hashed_ids = map(lambda f: f.hashed_id, files) + + # Null hypothesis: the distribution of hashed_ids is uniform + histogram, min_value, binsize, outsiders = scipy.stats.histogram( + hashed_ids, defaultlimits=(0, 2 ** 16 - 1) + ) + chisq, pvalue = scipy.stats.chisquare(histogram) + + self.assertGreater(min_value, -1) + self.assertEqual(outsiders, 0) + self.assertGreater(pvalue, 0.1) -- GitLab From 792f54831cb1fc19f362a6be03142b598c38b20d Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Fri, 20 Mar 2020 14:34:01 +0100 Subject: [PATCH 3/5] Adapt test environ --- src/fts3rest/fts3rest/tests/__init__.py | 35 ++++--------------------- 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/src/fts3rest/fts3rest/tests/__init__.py b/src/fts3rest/fts3rest/tests/__init__.py index d366d7b4..257b3cf3 100644 --- a/src/fts3rest/fts3rest/tests/__init__.py +++ b/src/fts3rest/fts3rest/tests/__init__.py @@ -16,9 +16,6 @@ from fts3rest.config.middleware import create_app from .ftstestclient import FTSTestClient -environ = {} - - def _generate_mock_cert(): rsa_key = RSA.gen_key(512, 65537) pkey = EVP.PKey() @@ -37,27 +34,6 @@ def _generate_mock_cert(): return pkey, cert -def _app_post_json(self, url, params, **kwargs): - """ - To be injected into TestApp if it doesn't have an post_json method available - """ - from json import dumps - - params = dumps(params) - kwargs["content_type"] = "application/json" - return self.post(url, params=params, **kwargs) - - -def _app_get_json(self, url, *args, **kwargs): - """ - Add get_json to TestApp for convenience - """ - headers = kwargs.pop("headers", dict()) - headers["Accept"] = "application/json" - kwargs["headers"] = headers - return self.get(url, *args, **kwargs) - - class TestController(TestCase): """ Base class for the tests @@ -72,7 +48,6 @@ class TestController(TestCase): self.flask_app.testing = True self.flask_app.test_client_class = FTSTestClient self.app = self.flask_app.test_client() - # todo adapt extra environ def setup_gridsite_environment(self, no_vo=False, dn=None): """ @@ -85,10 +60,10 @@ class TestController(TestCase): """ if dn is None: dn = TestController.TEST_USER_DN - self.flask_app.extra_environ["GRST_CRED_AURI_0"] = "dn:" + dn + self.app.environ_base["GRST_CRED_AURI_0"] = "dn:" + dn if not no_vo: - self.flask_app.extra_environ.update( + self.app.environ_base.update( { "GRST_CRED_AURI_1": "fqan:/testvo/Role=NULL/Capability=NULL", "GRST_CRED_AURI_2": "fqan:/testvo/Role=myrole/Capability=NULL", @@ -97,14 +72,14 @@ class TestController(TestCase): ) else: for grst in ["GRST_CRED_AURI_1", "GRST_CRED_AURI_2", "GRST_CRED_AURI_3"]: - if grst in self.flask_app.extra_environ: - del self.flask_app.extra_environ[grst] + if grst in self.app.environ_base: + del self.app.environ_base[grst] def get_user_credentials(self): """ Get the user credentials from the environment """ - return UserCredentials(self.flask_app.extra_environ, {"public": {"*": "all"}}) + return UserCredentials(self.app.environ_base, {"public": {"*": "all"}}) def push_delegation(self, lifetime=timedelta(hours=7)): """ -- GitLab From fc0cc8dfbd2b99811ff19fa187573f38882aee7e Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Fri, 20 Mar 2020 14:39:54 +0100 Subject: [PATCH 4/5] Untrack test --- .../tests/functional/test_job_submission.py | 1157 ----------------- 1 file changed, 1157 deletions(-) delete mode 100644 src/fts3rest/fts3rest/tests/functional/test_job_submission.py diff --git a/src/fts3rest/fts3rest/tests/functional/test_job_submission.py b/src/fts3rest/fts3rest/tests/functional/test_job_submission.py deleted file mode 100644 index 6760f9ee..00000000 --- a/src/fts3rest/fts3rest/tests/functional/test_job_submission.py +++ /dev/null @@ -1,1157 +0,0 @@ -import json -import mock -import socket -import time -from nose.plugins.skip import SkipTest -from sqlalchemy.orm import scoped_session, sessionmaker - -from fts3rest.tests import TestController -from fts3rest.lib.base import Session -from fts3.model import File, Job -import random - - -class TestJobSubmission(TestController): - """ - Tests job submission - """ - - def _validate_submitted(self, job, no_vo=False, dn=TestController.TEST_USER_DN): - self.assertNotEqual(job, None) - files = job.files - self.assertNotEqual(files, None) - - self.assertEqual(job.user_dn, dn) - if no_vo: - self.assertEqual(job.vo_name, "TestUser@cern.ch") - else: - self.assertEqual(job.vo_name, "testvo") - self.assertEqual(job.job_state, "SUBMITTED") - - self.assertEqual(job.source_se, "root://source.es") - self.assertEqual(job.dest_se, "root://dest.ch") - self.assertEqual(job.overwrite_flag, True) - self.assertEqual(job.verify_checksum, "b") - self.assertEqual(job.job_type, "N") - self.assertEqual(job.priority, 3) - self.assertIsNone(job.max_time_in_queue) - - self.assertEqual(len(files), 1) - self.assertEqual(files[0].file_state, "SUBMITTED") - self.assertEqual(files[0].source_surl, "root://source.es/file") - self.assertEqual(files[0].source_se, "root://source.es") - self.assertEqual(files[0].dest_se, "root://dest.ch") - self.assertEqual(files[0].file_index, 0) - self.assertEqual(files[0].selection_strategy, "orderly") - self.assertEqual(files[0].user_filesize, 1024) - self.assertEqual(files[0].checksum, "adler32:1234") - self.assertEqual(files[0].file_metadata["mykey"], "myvalue") - if no_vo: - self.assertEqual(files[0].vo_name, "TestUser@cern.ch") - else: - self.assertEqual(files[0].vo_name, "testvo") - - self.assertEquals(files[0].activity, "default") - - # Validate submitter - self.assertEqual(socket.getfqdn(), job.submit_host) - - def test_submit(self): - """ - Submit a valid job - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(job_id, 0) - - self._validate_submitted(Session.query(Job).get(job_id)) - - return str(job_id) - - def test_submit_no_reuse(self): - """ - Submit a valid job no reuse - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True, "reuse": False}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(job_id, 0) - - self._validate_submitted(Session.query(Job).get(job_id)) - - return str(job_id) - - def test_submit_no_reuse_N(self): - """ - Submit a valid job, using 'N' instead of False - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True, "reuse": "N"}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(job_id, 0) - - self._validate_submitted(Session.query(Job).get(job_id)) - - return str(job_id) - - def test_submit_reuse(self): - """ - Submit a valid reuse job - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True, "reuse": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.job_type, "Y") - - return job_id - - def test_submit_Y(self): - """ - Submit a valid reuse job, using 'Y' instead of True - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": "Y", "verify_checksum": "Y", "reuse": "Y"}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.job_type, "Y") - - def test_submit_post(self): - """ - Submit a valid job using POST instead of PUT - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - self._validate_submitted(Session.query(Job).get(job_id)) - - return job_id - - def test_submit_with_port(self): - """ - Submit a valid job where the port is explicit in the url. - source_se and dest_se must exclude this port - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "srm://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["srm://source.es:8446/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - db_job = Session.query(Job).get(job_id) - - self.assertEqual(db_job.source_se, "srm://source.es") - self.assertEqual(db_job.dest_se, "srm://dest.ch") - - self.assertEqual(db_job.files[0].source_se, "srm://source.es") - self.assertEqual(db_job.files[0].dest_se, "srm://dest.ch") - - return job_id - - def test_submit_only_query(self): - """ - Submit a valid job, without a path, but with a query in the url. - This is valid for some protocols (i.e. srm) - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["srm://source.es/?SFN=/path/"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": { - "overwrite": True, - "copy_pin_lifetime": 3600, - "bring_online": 60, - "verify_checksum": True, - }, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - db_job = Session.query(Job).get(job_id) - self.assertEqual(db_job.job_state, "STAGING") - self.assertEqual(db_job.files[0].file_state, "STAGING") - self.assertEqual(db_job.copy_pin_lifetime, 3600) - self.assertEqual(db_job.bring_online, 60) - - return job_id - - def test_null_checksum(self): - """ - Valid job, with checksum explicitly set to null - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": None, - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].checksum, "ADLER32") - - return job_id - - def test_checksum_no_verify(self): - """ - Valid job, with checksum, but verify_checksum is not set - In the DB, it must end as 'r' (compatibility with FTS3 behaviour) - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "1234F", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].checksum, "1234F") - self.assertEqual(job.verify_checksum, "t") - - return job_id - - def test_verify_checksum_target(self): - """ - Valid job, verify checksum in destination. - In the DB, it must end as 'r' (compatibility with FTS3 behaviour) or destination - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "1234F", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": "target"}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].checksum, "1234F") - self.assertEqual(job.verify_checksum, "t") - - return job_id - - def test_verify_checksum_source(self): - """ - Valid job, verify checksum in source. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "1234F", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": "source"}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].checksum, "1234F") - self.assertEqual(job.verify_checksum, "s") - - return job_id - - def test_verify_checksum_both(self): - """ - Valid job, verify checksum in source. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "1234F", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": "both"}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].checksum, "1234F") - self.assertEqual(job.verify_checksum, "b") - - return job_id - - def test_verify_checksum_none(self): - """ - Valid job, verify checksum none. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": "none"}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.verify_checksum, "n") - - return job_id - - def test_null_user_filesize(self): - """ - Valid job, with filesize explicitly set to null - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "filesize": None, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was committed to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].user_filesize, 0) - - return job_id - - def test_no_vo(self): - """ - Submit a valid job with no VO data in the credentials (could happen with plain SSL!) - The job must be accepted, but assigned to the user's 'virtual' vo. - """ - self.setup_gridsite_environment(no_vo=True) - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - self._validate_submitted(Session.query(Job).get(job_id), no_vo=True) - - def test_no_vo_proxy(self): - """ - Submit a valid job with no VO data in the credentials, but still being a proxy. - The job must be accepted, but assigned to the user's 'virtual' vo. - """ - proxy_dn = self.TEST_USER_DN + "/CN=proxy" - self.setup_gridsite_environment(no_vo=True, dn=proxy_dn) - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - self._validate_submitted( - Session.query(Job).get(job_id), no_vo=True, dn=proxy_dn - ) - - def test_retry(self): - """ - Submit with a specific retry value - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": { - "overwrite": True, - "verify_checksum": True, - "retry": 42, - "retry_delay": 123, - }, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self._validate_submitted(job) - self.assertEqual(job.retry, 42) - self.assertEqual(job.retry_delay, 123) - - def test_with_activity(self): - """ - Submit a job specifiying activities for the files - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file"], - "destinations": [dest_surl], - "activity": "my-activity", - }, - { - "sources": ["https://source.es/file2"], - "destinations": ["https://dest.ch/file2"], - "activity": "my-second-activity", - }, - ] - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self.assertEqual(job.files[0].activity, "my-activity") - self.assertEqual(job.files[1].activity, "my-second-activity") - - def test_surl_with_spaces(self): - """ - Submit a job where the surl has spaces at the beginning and at the end - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["root://source.es/file\n \r "], - "destinations": ["\r\n" + dest_surl + "\n\n \n"], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024.0, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.put( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - # Make sure it was commited to the DB - self.assertGreater(len(job_id), 0) - - job = Session.query(Job).get(job_id) - self._validate_submitted(job) - - def test_submit_different_protocols(self): - """ - Source and destination protocol mismatch - For REST <= 3.2.3, this used to be forbidden, but it was decided to allow it - https://its.cern.ch/jira/browse/FTS-97 - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertEqual(1, len(job.files)) - self.assertEqual("http://source.es:8446/file", job.files[0].source_surl) - self.assertEqual(dest_surl, job.files[0].dest_surl) - - def test_submit_with_cloud_cred(self): - """ - Submit a job specifying cloud credentials - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["dropbox://dropbox.com/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": {"overwrite": True, "verify_checksum": True}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertEqual(1, len(job.files)) - self.assertEqual("dropbox://dropbox.com/file", job.files[0].source_surl) - self.assertEqual(dest_surl, job.files[0].dest_surl) - - def test_submit_protocol_params(self): - """ - Submit a transfer specifying some protocol parameters - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - "selection_strategy": "orderly", - "checksum": "adler32:1234", - "filesize": 1024, - "metadata": {"mykey": "myvalue"}, - } - ], - "params": { - "overwrite": True, - "verify_checksum": True, - "timeout": 1234, - "nostreams": 42, - "buffer_size": 1025, - "strict_copy": True, - }, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertTrue(job.internal_job_params is not None) - params = job.internal_job_params.split(",") - self.assertIn("timeout:1234", params) - self.assertIn("nostreams:42", params) - self.assertIn("buffersize:1025", params) - self.assertIn("strict", params) - - def test_submit_with_priority(self): - """ - Submit a job specifying the priority - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - } - ], - "params": {"priority": 5,}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertEqual(job.priority, 5) - - def test_submit_max_time_in_queue(self): - """ - Submits a job specifying the maximum time it should stay in the queue. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - } - ], - "params": {"max_time_in_queue": 8}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - # See FTS-311 - # max_time_in_queue was effectively ignored by FTS3 - # Since FTS-311, this field stores the timestamp when the job expires - job = Session.query(Job).get(job_id) - self.assertGreater(job.max_time_in_queue, time.time()) - self.assertLessEqual(job.max_time_in_queue, (8 * 60 * 60) + time.time()) - - def test_submit_max_time_in_queue_suffix(self): - """ - Submits a job specifying the maximum time it should stay in the queue. - Use a suffix. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest.ch:8447/file" + str(random.randint(0, 100)) - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - } - ], - "params": {"max_time_in_queue": "4s"}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertGreater(job.max_time_in_queue, time.time()) - self.assertLessEqual(job.max_time_in_queue, 8 + time.time()) - - def test_submit_max_time_in_queue_suffix2(self): - """ - Submits a job specifying the maximum time it should stay in the queue. - Use a suffix. - """ - self.setup_gridsite_environment() - self.push_delegation() - dest_surl = "root://dest" + str(random.randint(0, 100)) + ".ch:8447/file" - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": [dest_surl], - } - ], - "params": {"max_time_in_queue": "2m"}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - job = Session.query(Job).get(job_id) - self.assertGreater(job.max_time_in_queue, time.time()) - self.assertLessEqual(job.max_time_in_queue, 120 + time.time()) - - def test_submit_ipv4(self): - """ - Submit a job with IPv4 only - """ - self.setup_gridsite_environment() - self.push_delegation() - - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": ["root://destipv4.ch:8447/file"], - } - ], - "params": {"ipv4": True}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - jobdb = Session.query(Job).get(job_id) - self.assertIn("ipv4", jobdb.internal_job_params) - self.assertNotIn("ipv6", jobdb.internal_job_params) - - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": ["root://destipv4tofalse.ch:8447/file"], - } - ], - "params": {"ipv4": False}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - jobdb = Session.query(Job).get(job_id) - self.assertTrue( - jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params - ) - self.assertTrue( - jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params - ) - - def test_submit_ipv6(self): - """ - Submit a job with IPv6 only - """ - self.setup_gridsite_environment() - self.push_delegation() - - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": ["root://destipv6.ch:8447/file"], - } - ], - "params": {"ipv6": True}, - } - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - jobdb = Session.query(Job).get(job_id) - self.assertIn("ipv6", jobdb.internal_job_params) - self.assertNotIn("ipv4", jobdb.internal_job_params) - - job = { - "files": [ - { - "sources": ["http://source.es:8446/file"], - "destinations": ["root://destipv6tofalse.ch:8447/file"], - } - ], - "params": {"ipv6": False}, - } - - job_id = self.flask_app.post( - url="/jobs", - content_type="application/json", - params=json.dumps(job), - status=200, - ).json["job_id"] - - jobdb = Session.query(Job).get(job_id) - self.assertTrue( - jobdb.internal_job_params is None or "ipv4" not in jobdb.internal_job_params - ) - self.assertTrue( - jobdb.internal_job_params is None or "ipv6" not in jobdb.internal_job_params - ) - - def test_files_balanced(self): - """ - Checks the distribution of the file 'hashed ids' is reasonably uniformely distributed. - hashed_id is a legacy name, its purpose is balance the transfers between hosts - regardless of the number running in a giving moment - """ - raise SkipTest("Disabled as it is not very reliable") - self.setup_gridsite_environment() - self.push_delegation() - - files = [] - for r in xrange(5000): - files.append( - { - "sources": ["root://source.es/file%d" % r], - "destinations": [ - "root://dest.ch/file%d%d" % (r, random.randint(0, 100)) - ], - } - ) - - job = {"files": files} - - job_id = self.app.put(url="/jobs", params=json.dumps(job), status=200).json[ - "job_id" - ] - - files = Session.query(File.hashed_id).filter(File.job_id == job_id) - hashed_ids = map(lambda f: f.hashed_id, files) - - # Null hypothesis: the distribution of hashed_ids is uniform - histogram, min_value, binsize, outsiders = scipy.stats.histogram( - hashed_ids, defaultlimits=(0, 2 ** 16 - 1) - ) - chisq, pvalue = scipy.stats.chisquare(histogram) - - self.assertGreater(min_value, -1) - self.assertEqual(outsiders, 0) - self.assertGreater(pvalue, 0.1) -- GitLab From 6afd56840e5c018c15651a462b0ed73a8a0dc6d0 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Fri, 20 Mar 2020 14:44:10 +0100 Subject: [PATCH 5/5] Fix syntax error --- src/fts3rest/fts3rest/tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fts3rest/fts3rest/tests/__init__.py b/src/fts3rest/fts3rest/tests/__init__.py index 257b3cf3..558f1fb6 100644 --- a/src/fts3rest/fts3rest/tests/__init__.py +++ b/src/fts3rest/fts3rest/tests/__init__.py @@ -189,7 +189,7 @@ class TestController(TestCase): Session.commit() # Delete messages - if "fts3.MessagingDirectory" in app.config: + if "fts3.MessagingDirectory" in self.flask_app.config: try: shutil.rmtree(self.flask_app.config["fts3.MessagingDirectory"]) except Exception: -- GitLab