diff --git a/pyjapc/_japc.py b/pyjapc/_japc.py index 77177f523d50439be54e7e30dc1483821fa74da2..8441cbe3a153d6e3730410c0aedfc2416b533d65 100644 --- a/pyjapc/_japc.py +++ b/pyjapc/_japc.py @@ -392,12 +392,17 @@ class PyJapc: username: typing.Optional[str] = None, password: typing.Optional[str] = None, loginDialog: bool = False, - readEnv: bool = True + readEnv: bool = True, + *, + token: typing.Union[bytes, pyjapc_types.PyRbacTokenLike, None], ) -> None: """Perform RBAC authentication. This is required to work with access-protected parameters. + If ``token`` (a bytes-like object or PyRbac token) is provided, it will be used to + authenticate. + If the environment variable ``RBAC_TOKEN_SERIALIZED`` is set and ``readEnv`` is ``True`` then a login will be attempted with the RBAC token contained in the environment. @@ -439,6 +444,8 @@ class PyJapc: - ``rbacLogin(loginDialog=True)`` * - By username and password (consider acquiring this through :meth:`getpass.getpass`) - ``rbacLogin(username=some_value, password=some_password)`` + * - Using a pre-generated token (e.g. from pyrbac) + - ``rbacLogin(token=pyrbac_token)`` Args: username (str): The RBAC username for an "explicit" login. @@ -451,29 +458,33 @@ class PyJapc: readEnv (bool): Read the RBAC token from the environment variable ``RBAC_TOKEN_SERIALIZED`` if it is set (default True). + + token(bytes|Token): If specified, the RBAC token to use. + A serialized token (a bytes-like object) or a PyRbac ``Token``. """ - cern = jp.JPackage("cern") - byLoc = False + if token: + try: + self.log.info("Authenticating using provided RBAC token.") + if isinstance(token, bytes): + token_bytes = token + else: + token_bytes = token.encode() + self._doSetTokenToJava(token_bytes) + return + except Exception as e: + self.log.warning(e) env = os.getenv("RBAC_TOKEN_SERIALIZED", "") if readEnv and env: try: self.log.info("Reusing RBAC token from environment") - # type ignore because of bug in stubgenj (bytes -> List[int]) - token_bytes: typing.List[int] = base64.b64decode(env) # type: ignore - token = cern.rbac.common.RbaToken(token_bytes) - self._close_rbaLoginService() - cern.rbac.util.holder.ClientTierTokenHolder.setRbaToken(token) - - if cern.rbac.util.lookup.RbaTokenLookup.findRbaToken() is None: - raise RuntimeError("Could not reuse RBAC token, maybe it has expired?") - - self.log.info("RBAC login successful") + self._doSetTokenToJava(base64.b64decode(env)) return except Exception as e: self.log.warning(e) + byLoc = False if loginDialog: import pyjapc.rbac_dialog as rbac_dialog if username is None: @@ -495,6 +506,17 @@ class PyJapc: self._close_rbaLoginService() raise e + def _doSetTokenToJava(self, token_bytes: bytes) -> None: + cern = jp.JPackage("cern") + token = cern.rbac.common.RbaToken(token_bytes) + self._close_rbaLoginService() + cern.rbac.util.holder.ClientTierTokenHolder.setRbaToken(token) + + if cern.rbac.util.lookup.RbaTokenLookup.findRbaToken() is None: + raise RuntimeError("Could not reuse RBAC token, maybe it has expired?") + + self.log.info("RBAC login successful") + def _doLogin( self, byLoc: bool, diff --git a/pyjapc/_types.pyi b/pyjapc/_types.pyi index 717b0977da5ea601738a108237b80b2461c99994..4063025ac3872a98bdcac49c3ae4aa3c573f64ac 100644 --- a/pyjapc/_types.pyi +++ b/pyjapc/_types.pyi @@ -5,6 +5,13 @@ import java import datetime +import sys + +if sys.version_info > (3, 7): + from typing import Protocol +else: + # For Python 3.7, allow use of Protocol. When py37 dropped, move to typing.Protocol form. + from typing_extensions import Protocol # Input types JString = typing.Union[java.lang.String, str] @@ -45,3 +52,5 @@ ParameterTypes = typing.Union[ cern.japc.core.group.ParameterGroup, ] +class PyRbacTokenLike(Protocol): + def encode(self) -> bytes: ...