Commit ec0adc17 authored by Chris Burr's avatar Chris Burr
Browse files

Merge branch 'use-lbprodrun-and-lbapi' into 'master'

Support new SSO and require Python 3.9+

See merge request !53
parents a953bf97 e58851df
Pipeline #3597922 passed with stages
in 3 minutes and 38 seconds
......@@ -34,17 +34,12 @@ test:
stage: test
- mamba create --name test-env --yes python=${PY_VER} pytest xrootd uproot pip${PIP_VER} boost-histogram requests-kerberos
- mamba env create --name test-env --file environment.yaml
- eval "$(python -m conda shell.bash hook)"
- conda activate test-env
- pip install '.[testing]'
- pytest -vvv --cov-report xml
- PY_VER: ["3.8", "3.9"]
cobertura: coverage.xml
......@@ -53,13 +48,11 @@ test:
stage: deploy
- tags
dependencies: []
- tags@lhcb-dpa/analysis-productions/lbapcommon
- pip install build twine
- python sdist --dist-dir public/
- python bdist_wheel --dist-dir public/
- python -m build
- if [ -z "$TWINE_PASSWORD" ] ; then echo "Set TWINE_PASSWORD in CI variables" ; exit 1 ; fi
- twine upload -u __token__ public/*
before_script: []
after_script: []
- twine upload -u __token__ dist/*
# See for more information
# See for more hooks
exclude: ^(tests/example-logs)
exclude: ^(tests/example-logs|src/LbAPCommon/$)
python: python3
......@@ -32,11 +32,7 @@ repos:
- id: flake8
additional_dependencies: [flake8-bugbear]
- repo: local
- repo: ""
rev: 73edff357d446703066009ecd6ae4074740a3157
- id: check-copyright
name: check-copyright
entry: ./lb-check-copyright
language: script
files: ''
pass_filenames: false
- id: lb-add-copyright
name: lbaplocal-dev
- conda-forge
- nodefaults
- python =3.9
- pip
# Runtime dependencies
- strictyaml
- jinja2
- lxml
- pyyaml
- setuptools
- boost-histogram
- hist
- requests
- requests-kerberos
- uproot >=4.0.0
- awkward >=1.0.0
- matplotlib
- requests-gssapi
- beautifulsoup4
# Development dependencies
- pre-commit
- pytest
- pytest-cov
- pytest-mock
- pytest-timeout
......@@ -33,13 +33,13 @@ setup(
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
keywords="LHCb HEP CERN",
package_dir={"": "src"},
......@@ -54,6 +54,8 @@ setup(
extras_require={"testing": ["pytest", "pytest-cov", "pytest-timeout"]},
package_data={"LbAPCommon": package_data},
......@@ -16,6 +16,7 @@ __all__ = [
......@@ -23,6 +24,7 @@ from os.path import join
from . import hacks, validators
from .linting import lint_all
from .new_cern_sso import get_sso_token
from .parsing import parse_yaml, render_yaml, validate_yaml
......@@ -31,14 +31,14 @@ def project_uses_cmt(app_name, app_version):
def setup_lbrun_environment(siteroot, repository_dir, setup_cmt):
"""Set up the fake siteroot for lb-run to use when testing"""
os.environ["CMAKE_PREFIX_PATH"] = siteroot
os.environ["CMAKE_PREFIX_PATH"] = str(siteroot)
fake_dbase = join(siteroot, "DBASE")
fake_install_dir = join(fake_dbase, "AnalysisProductions", "v999999999999")
os.symlink(repository_dir, fake_install_dir)
if setup_cmt:
print("Applying fallback hacks for CMT style projects")
os.environ["User_release_area"] = siteroot
os.environ["User_release_area"] = str(siteroot)
LHCB_DBASE_ROOT = "/cvmfs/"
for dname in os.listdir(LHCB_DBASE_ROOT):
if dname == "AnalysisProductions":
import logging
import requests
import time
import uuid
from requests_gssapi import HTTPSPNEGOAuth, OPTIONAL
from bs4 import BeautifulSoup
from http.cookiejar import MozillaCookieJar, Cookie
from urllib.parse import parse_qs
def save_cookies_lwp(cookiejar, filename):
"""Saves cookies from a requests.Session cookies member into a file in the Netscape format"""
lwp_cookiejar = MozillaCookieJar()
for c in cookiejar:
args = dict(vars(c).items())
args["rest"] = args["_rest"]
del args["_rest"]
if args["expires"] == None:
args["expires"] = int(time.time()) + 86400
c = Cookie(**args)
lwp_cookiejar.set_cookie(c), ignore_discard=True)
def post_session_saml(session, response):
"""Performs the SAML POST request given a session and a successful Keycloak authentication response in SAML"""
soup_saml = BeautifulSoup(response.text, features="html.parser")
action = soup_saml.form.get("action")
post_key = soup_saml.form.input.get("name")
post_value = soup_saml.form.input.get("value"), data={post_key: post_value})
def login_with_kerberos(login_page, verify_cert, auth_hostname, silent):
"""Simulates a browser session to log in using SPNEGO protocol"""
session = requests.Session()
if not silent:"Fetching target URL and its redirects")
r_login_page = session.get(login_page, verify=verify_cert)
if "" in r_login_page.url:
raise NotImplementedError("Tried to hit old CERN SSO")
if not silent:
logging.debug("Landing page: {}".format(r_login_page.url))"Parsing landing page to get the Kerberos login URL")
soup = BeautifulSoup(r_login_page.text, features="html.parser")
kerberos_button = soup.find(id="zocial-kerberos")
if not kerberos_button:
error_message = get_error_message(r_login_page.text)
if error_message:
raise Exception("Login failed: {}".format(error_message))
raise Exception(
"Login failed: Landing page not recognized as the CERN SSO login page.")
kerberos_path = kerberos_button.get("href")
if not silent:"Fetching Kerberos login URL")
r_kerberos_redirect = session.get(
"https://{}{}".format(auth_hostname, kerberos_path)
if not silent:"Logging in using Kerberos Auth")
r_kerberos_auth = session.get(
while (
r_kerberos_auth.status_code == 302
and auth_hostname in r_kerberos_auth.headers["Location"]
r_kerberos_auth = session.get(
r_kerberos_auth.headers["Location"], allow_redirects=False
if r_kerberos_auth.status_code != 302:
if "login-actions/consent" in r_kerberos_auth.text:
raise Exception(
"Login failed: This application requires consent. Please accept it manually before using this tool.")
error_message = get_error_message(r_kerberos_auth.text)
if not error_message:
"Not automatically redirected: trying SAML authentication")
post_session_saml(session, r_kerberos_auth)
raise Exception("Login failed: {}".format(error_message))
return session, r_kerberos_auth
def get_error_message(response_html):
soup_err_page = BeautifulSoup(response_html, features="html.parser")
error_message = soup_err_page.find(id="kc-error-message")
if not error_message:
return None
return error_message.find("p").text
def save_sso_cookie(url, file, verify_cert, auth_hostname, silent=False):
"""Log in into a URL that redirects to the SSO and save the session cookies"""
session, response = login_with_kerberos(
url, verify_cert, auth_hostname, silent=silent)
if response.status_code == 302:
redirect_uri = response.headers["Location"]
if not silent:
"Logged in. Fetching redirect URL to get application cookies")
session.get(redirect_uri, verify=verify_cert)
if not silent:"Saving cookies in {}".format(file))
save_cookies_lwp(session.cookies, file)
except Exception as e:
"An error occurred while trying to log in and save cookies.")
raise e
def get_sso_token(url, clientid, verify_cert=True, auth_hostname="", auth_realm="cern", silent=False):
"""Get an OIDC token by logging in in the Auhtorization URL using Kerberos
:param url: Application or Redirect URL. Required for the OAuth request.
:param clientid: Client ID of a client with implicit flow enabled.
:param verify_cert: Verify certificate.
:param auth_hostname: Keycloak hostname.
:param auth_realm: Authentication realm.
:param silent: Flag for printing log messages (default: False).
random_state = str(uuid.uuid4()).split("-")[0]
authz_url = "https://{}/auth/realms/{}/protocol/openid-connect/auth?client_id={}&response_type=code&state={}&redirect_uri={}".format(
auth_hostname, auth_realm, clientid, random_state, url
login_response = login_with_kerberos(
authz_url, verify_cert, auth_hostname, silent=silent)[1]
authz_response = parse_qs(
if authz_response["state"][0] != random_state:
raise Exception(
"The authorization response doesn't contain the expected state value.")
r =
auth_hostname, auth_realm
"client_id": clientid,
"grant_type": "authorization_code",
"code": authz_response["code"][0],
"redirect_uri": url,
if not silent:
if not r.ok:
"The token response was not successful: {}".format(r.json()))
token_response = r.json()
return token_response["access_token"]
except Exception as e:
if not silent:
"An error occurred while trying to fetch user token."
raise e
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment