Commit 1b65e984 authored by Pablo Panero's avatar Pablo Panero
Browse files

[48] permissions: make schema owner configurable and refactor logic

parent 3dd28154
......@@ -4,7 +4,8 @@ url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
pytest = "*"
pytest = ">=3.8.1"
pytest-mock = ">=1.6.0"
flake8 = ">=3.7.8"
flake8-docstrings = ">=1.5.0"
isort = "==4.3.21"
......
{
"_meta": {
"hash": {
"sha256": "c80aaa51a85f385ce614a1ccd3450cfb9559ccf37546221a664ab050d28996a9"
"sha256": "5aa9419591b659fd9a0488970c7fc1cc3972fdb20025ce9f8d592e630074b8b3"
},
"pipfile-spec": 6,
"requires": {
......@@ -18,9 +18,9 @@
"default": {
"alembic": {
"hashes": [
"sha256:9f907d7e8b286a1cfb22db9084f9ce4fde7ad7956bb496dc7c952e10ac90e36a"
"sha256:e6c6a4243e89c8d3e2342a1562b2388f3b524c9cac2fccc4d2c461a1320cc1c1"
],
"version": "==1.2.1"
"version": "==1.3.0"
},
"amqp": {
"hashes": [
......@@ -31,10 +31,10 @@
},
"arrow": {
"hashes": [
"sha256:10257c5daba1a88db34afa284823382f4963feca7733b9107956bed041aff24f",
"sha256:c2325911fcd79972cf493cfd957072f9644af8ad25456201ae1ede3316576eb4"
"sha256:01a16d8a93eddf86a29237f32ae36b29c27f047e79312eb4df5d55fd5a2b3183",
"sha256:e1a318a4c0b787833ae46302c02488b6eeef413c6a13324b3261ad320f21ec1e"
],
"version": "==0.15.2"
"version": "==0.15.4"
},
"attrs": {
"hashes": [
......@@ -100,38 +100,40 @@
},
"cffi": {
"hashes": [
"sha256:00d890313797d9fe4420506613384b43099ad7d2b905c0752dbcc3a6f14d80fa",
"sha256:0cf9e550ac6c5e57b713437e2f4ac2d7fd0cd10336525a27224f5fc1ec2ee59a",
"sha256:0ea23c9c0cdd6778146a50d867d6405693ac3b80a68829966c98dd5e1bbae400",
"sha256:193697c2918ecdb3865acf6557cddf5076bb39f1f654975e087b67efdff83365",
"sha256:1ae14b542bf3b35e5229439c35653d2ef7d8316c1fffb980f9b7647e544baa98",
"sha256:1e389e069450609c6ffa37f21f40cce36f9be7643bbe5051ab1de99d5a779526",
"sha256:263242b6ace7f9cd4ea401428d2d45066b49a700852334fd55311bde36dcda14",
"sha256:33142ae9807665fa6511cfa9857132b2c3ee6ddffb012b3f0933fc11e1e830d5",
"sha256:364f8404034ae1b232335d8c7f7b57deac566f148f7222cef78cf8ae28ef764e",
"sha256:47368f69fe6529f8f49a5d146ddee713fc9057e31d61e8b6dc86a6a5e38cecc1",
"sha256:4895640844f17bec32943995dc8c96989226974dfeb9dd121cc45d36e0d0c434",
"sha256:558b3afef987cf4b17abd849e7bedf64ee12b28175d564d05b628a0f9355599b",
"sha256:5ba86e1d80d458b338bda676fd9f9d68cb4e7a03819632969cf6d46b01a26730",
"sha256:63424daa6955e6b4c70dc2755897f5be1d719eabe71b2625948b222775ed5c43",
"sha256:6381a7d8b1ebd0bc27c3bc85bc1bfadbb6e6f756b4d4db0aa1425c3719ba26b4",
"sha256:6381ab708158c4e1639da1f2a7679a9bbe3e5a776fc6d1fd808076f0e3145331",
"sha256:6fd58366747debfa5e6163ada468a90788411f10c92597d3b0a912d07e580c36",
"sha256:728ec653964655d65408949b07f9b2219df78badd601d6c49e28d604efe40599",
"sha256:7cfcfda59ef1f95b9f729c56fe8a4041899f96b72685d36ef16a3440a0f85da8",
"sha256:819f8d5197c2684524637f940445c06e003c4a541f9983fd30d6deaa2a5487d8",
"sha256:825ecffd9574557590e3225560a8a9d751f6ffe4a49e3c40918c9969b93395fa",
"sha256:9009e917d8f5ef780c2626e29b6bc126f4cb2a4d43ca67aa2b40f2a5d6385e78",
"sha256:9c77564a51d4d914ed5af096cd9843d90c45b784b511723bd46a8a9d09cf16fc",
"sha256:a19089fa74ed19c4fe96502a291cfdb89223a9705b1d73b3005df4256976142e",
"sha256:a40ed527bffa2b7ebe07acc5a3f782da072e262ca994b4f2085100b5a444bbb2",
"sha256:bb75ba21d5716abc41af16eac1145ab2e471deedde1f22c6f99bd9f995504df0",
"sha256:e22a00c0c81ffcecaf07c2bfb3672fa372c50e2bd1024ffee0da191c1b27fc71",
"sha256:e55b5a746fb77f10c83e8af081979351722f6ea48facea79d470b3731c7b2891",
"sha256:ec2fa3ee81707a5232bf2dfbd6623fdb278e070d596effc7e2d788f2ada71a05",
"sha256:fd82eb4694be712fcae03c717ca2e0fc720657ac226b80bbb597e971fc6928c2"
],
"version": "==1.13.1"
"sha256:0b49274afc941c626b605fb59b59c3485c17dc776dc3cc7cc14aca74cc19cc42",
"sha256:0e3ea92942cb1168e38c05c1d56b0527ce31f1a370f6117f1d490b8dcd6b3a04",
"sha256:135f69aecbf4517d5b3d6429207b2dff49c876be724ac0c8bf8e1ea99df3d7e5",
"sha256:19db0cdd6e516f13329cba4903368bff9bb5a9331d3410b1b448daaadc495e54",
"sha256:2781e9ad0e9d47173c0093321bb5435a9dfae0ed6a762aabafa13108f5f7b2ba",
"sha256:291f7c42e21d72144bb1c1b2e825ec60f46d0a7468f5346841860454c7aa8f57",
"sha256:2c5e309ec482556397cb21ede0350c5e82f0eb2621de04b2633588d118da4396",
"sha256:2e9c80a8c3344a92cb04661115898a9129c074f7ab82011ef4b612f645939f12",
"sha256:32a262e2b90ffcfdd97c7a5e24a6012a43c61f1f5a57789ad80af1d26c6acd97",
"sha256:3c9fff570f13480b201e9ab69453108f6d98244a7f495e91b6c654a47486ba43",
"sha256:415bdc7ca8c1c634a6d7163d43fb0ea885a07e9618a64bda407e04b04333b7db",
"sha256:4424e42199e86b21fc4db83bd76909a6fc2a2aefb352cb5414833c030f6ed71b",
"sha256:4a43c91840bda5f55249413037b7a9b79c90b1184ed504883b72c4df70778579",
"sha256:599a1e8ff057ac530c9ad1778293c665cb81a791421f46922d80a86473c13346",
"sha256:5c4fae4e9cdd18c82ba3a134be256e98dc0596af1e7285a3d2602c97dcfa5159",
"sha256:5ecfa867dea6fabe2a58f03ac9186ea64da1386af2159196da51c4904e11d652",
"sha256:62f2578358d3a92e4ab2d830cd1c2049c9c0d0e6d3c58322993cc341bdeac22e",
"sha256:6471a82d5abea994e38d2c2abc77164b4f7fbaaf80261cb98394d5793f11b12a",
"sha256:6d4f18483d040e18546108eb13b1dfa1000a089bcf8529e30346116ea6240506",
"sha256:71a608532ab3bd26223c8d841dde43f3516aa5d2bf37b50ac410bb5e99053e8f",
"sha256:74a1d8c85fb6ff0b30fbfa8ad0ac23cd601a138f7509dc617ebc65ef305bb98d",
"sha256:7b93a885bb13073afb0aa73ad82059a4c41f4b7d8eb8368980448b52d4c7dc2c",
"sha256:7d4751da932caaec419d514eaa4215eaf14b612cff66398dd51129ac22680b20",
"sha256:7f627141a26b551bdebbc4855c1157feeef18241b4b8366ed22a5c7d672ef858",
"sha256:8169cf44dd8f9071b2b9248c35fc35e8677451c52f795daa2bb4643f32a540bc",
"sha256:aa00d66c0fab27373ae44ae26a66a9e43ff2a678bf63a9c7c1a9a4d61172827a",
"sha256:ccb032fda0873254380aa2bfad2582aedc2959186cce61e3a17abc1a55ff89c3",
"sha256:d754f39e0d1603b5b24a7f8484b22d2904fa551fe865fd0d4c3332f078d20d4e",
"sha256:d75c461e20e29afc0aee7172a0950157c704ff0dd51613506bd7d82b718e7410",
"sha256:dcd65317dd15bc0451f3e01c80da2216a31916bdcffd6221ca1202d96584aa25",
"sha256:e570d3ab32e2c2861c4ebe6ffcad6a8abf9347432a37608fe1fbd157b3f0036b",
"sha256:fd43a88e045cf992ed09fa724b5315b790525f2676883a6ea64e3263bae6549d"
],
"version": "==1.13.2"
},
"chardet": {
"hashes": [
......@@ -182,10 +184,10 @@
},
"decorator": {
"hashes": [
"sha256:86156361c50488b84a3f148056ea716ca587df2f0de1d34750d35c21312725de",
"sha256:f069f3a01830ca754ba5258fde2278454a0b5b79e0d7f5c13b3b97e57d4acff6"
"sha256:54c38050039232e1db4ad7375cfce6748d7b41c29e95a081c8a6d2c30364a2ce",
"sha256:5d19b92a3c8f7f101c8dd86afd86b0f061a8ce4540ab8cd401fa2542756bce6d"
],
"version": "==4.4.0"
"version": "==4.4.1"
},
"elasticsearch": {
"hashes": [
......@@ -364,9 +366,9 @@
},
"future": {
"hashes": [
"sha256:858e38522e8fd0d3ce8f0c1feaf0603358e366d5403209674c7b617fa0c24093"
"sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d"
],
"version": "==0.18.1"
"version": "==0.18.2"
},
"html5lib": {
"hashes": [
......@@ -600,10 +602,10 @@
},
"ipython": {
"hashes": [
"sha256:c4ab005921641e40a68e405e286e7a1fcc464497e14d81b6914b4fd95e5dee9b",
"sha256:dd76831f065f17bddd7eaa5c781f5ea32de5ef217592cf019e34043b56895aa1"
"sha256:dfd303b270b7b5232b3d08bd30ec6fd685d8a58cabd54055e3d69d8f029f7280",
"sha256:ed7ebe1cba899c1c3ccad6f7f1c2d2369464cc77dba8eebc65e2043e19cda995"
],
"version": "==7.8.0"
"version": "==7.9.0"
},
"ipython-genutils": {
"hashes": [
......@@ -938,16 +940,16 @@
},
"pyrsistent": {
"hashes": [
"sha256:34b47fa169d6006b32e99d4b3c4031f155e6e68ebcc107d6454852e8e0ee6533"
"sha256:eb6545dbeb1aa69ab1fb4809bfbf5a8705e44d92ef8fc7c2361682a47c46c778"
],
"version": "==0.15.4"
"version": "==0.15.5"
},
"python-dateutil": {
"hashes": [
"sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb",
"sha256:c89805f6f4d64db21ed966fda138f8a5ed7a4fdbc1a8ee329ce1b74e3c74da9e"
"sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c",
"sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"
],
"version": "==2.8.0"
"version": "==2.8.1"
},
"python-editor": {
"hashes": [
......@@ -1010,11 +1012,11 @@
},
"simplekv": {
"hashes": [
"sha256:58bbb99c3d3a55ab69a2dabb064a309fd7089730cb5f9d3eecb47b110e3fc5b0",
"sha256:a2f8cdde73b1f9ae60540f9bc0b49c564226d8a46f6a1bd98d1b6169f98274ed",
"sha256:ae52e8f48618ecf9f81631c152ea8fbed8b88fb7c17870db984a295481483c06"
"sha256:0700a30ecd9e19dd03dd8df3533f0f6b32a1d8d4fdf7ad5540f009ac3273db13",
"sha256:573c704748ad48fcd54add5eac565167e85f022cf3a36cb459431155061401bc",
"sha256:f2048a59edf46c72c96d6405902afd18735c7104241284be2903f54e0dbbf037"
],
"version": "==0.13.0"
"version": "==0.13.1"
},
"six": {
"hashes": [
......@@ -1046,9 +1048,9 @@
"encrypted"
],
"hashes": [
"sha256:6689b29d7951c5c7c4d79fa6b8c95f9ff9ec708b07aa53f82060599bd14dcc88"
"sha256:01f0f0ebed696386bc7bf9231cd6894087baba374dd60f40eb1b07512d6b1a5e"
],
"version": "==0.34.2"
"version": "==0.35.0"
},
"traitlets": {
"hashes": [
......@@ -1199,11 +1201,11 @@
},
"flake8": {
"hashes": [
"sha256:19241c1cbc971b9962473e4438a2ca19749a7dd002dd1a946eaba171b4114548",
"sha256:8e9dfa3cecb2400b3738a42c54c3043e821682b9c840b0448c0503f781130696"
"sha256:45681a117ecc81e870cbf1262835ae4af5e7a8b08e40b944a8a6e6b895914cfb",
"sha256:49356e766643ad15072a789a20915d3c91dc89fd313ccd71802303fd67e4deca"
],
"index": "pypi",
"version": "==3.7.8"
"version": "==3.7.9"
},
"flake8-docstrings": {
"hashes": [
......@@ -1300,6 +1302,14 @@
"index": "pypi",
"version": "==5.2.2"
},
"pytest-mock": {
"hashes": [
"sha256:b3514caac35fe3f05555923eabd9546abce11571cc2ddf7d8615959d04f2c89e",
"sha256:ea502c3891599c26243a3a847ccf0b1d20556678c528f86c98e3cd6d40c5cf11"
],
"index": "pypi",
"version": "==1.11.2"
},
"six": {
"hashes": [
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
......
......@@ -4,14 +4,13 @@
# This file is part of CERN Search.
# Copyright (C) 2018-2019 CERN.
#
# CERN Search is free software; you can redistribute it and/or modify it
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
from cern_search_rest_api.modules.cernsearch.utils import get_user_provides
from flask import current_app, request
from flask import current_app
from flask_security import current_user
from invenio_indexer.utils import default_record_to_index
from invenio_search import current_search_client
from functools import partial
"""Access control for CERN Search."""
......@@ -88,6 +87,14 @@ class RecordPermission(object):
return cls(record, deny, user)
def _granted(provides, needs):
return provides and not set(provides).isdisjoint(set(needs))
def _user_granted(needs):
return _granted(provides=get_user_provides(), needs=needs)
def has_owner_permission(user, record=None):
"""Check if user is authenticated and has create access"""
log_action(user, 'CREATE/OWNER')
......@@ -97,21 +104,11 @@ def has_owner_permission(user, record=None):
return user.is_authenticated
# Second authentication phase, record level
if user.is_authenticated:
# Allow based in the '_access' key
user_provides = get_user_provides()
es_index, doc = default_record_to_index(record)
current_app.logger.debug('Using index {idx} and doc {doc}'.format(idx=es_index, doc=doc))
if current_search_client.indices.exists([es_index]):
mapping = current_search_client.indices.get_mapping([es_index])
if mapping is not None:
current_app.logger.debug('Using mapping for {idx}'.format(idx=es_index))
current_app.logger.debug('Mapping {mapping}'.format(mapping=mapping))
# set.isdisjoint() is faster than set.intersection()
create_access_groups = mapping[es_index]['mappings'][doc]['_meta']['_owner'].split(',')
if user_provides and not set(user_provides).isdisjoint(set(create_access_groups)):
current_app.logger.debug('User authenticated correctly')
return True
current_app.logger.debug('Could not authenticate user, group sets are disjoint')
admin_access = current_app.config.get('ADMIN_ACCESS_GROUPS', '')
admin_access = admin_access.split(',')
return _user_granted(admin_access)
return False
......@@ -125,27 +122,21 @@ def has_list_permission(user, record=None):
def has_update_permission(user, record):
"""Check if user is authenticated and has update access"""
"""Check if user is authenticated and has update access."""
log_action(user, 'UPDATE')
if user.is_authenticated:
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
update_access_groups = get_access_set(record['_access'], 'update')
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
# If the record exists in ES
# The user provides egroup/membership
update_access = get_access_set(record['_access'], 'update')
delete_access = get_access_set(record['_access'], 'delete')
owner_access = get_access_set(record['_access'], 'owner')
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# The user belongs to any access group, meaning the list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(update_access_groups))
or not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
if has_owner_permission(user, record) or (
_user_granted(update_access) or
_user_granted(delete_access) or
_user_granted(owner_access)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
......@@ -153,32 +144,26 @@ def has_update_permission(user, record):
def has_read_record_permission(user, record):
"""Check if user is authenticated and has read access. This implies reading one document"""
"""Check if user is authenticated and has read access. This implies reading one document."""
log_action(user, 'READ')
if user.is_authenticated:
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
read_access_groups = get_access_set(record['_access'], 'read')
update_access_groups = get_access_set(record['_access'], 'update')
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
read_access = get_access_set(record['_access'], 'read')
update_access = get_access_set(record['_access'], 'update')
delete_access = get_access_set(record['_access'], 'delete')
owner_access = get_access_set(record['_access'], 'owner')
if not read_access_groups:
if not read_access:
return True
# If the record exists in ES
# The user provides egroup/membership
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# The user belongs to any access group, meaning the list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(read_access_groups))
or not set(user_provides).isdisjoint(set(update_access_groups))
or not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
if has_owner_permission(user, record) or (
_user_granted(read_access) or
_user_granted(update_access) or
_user_granted(delete_access) or
_user_granted(owner_access)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
......@@ -186,25 +171,19 @@ def has_read_record_permission(user, record):
def has_delete_permission(user, record):
"""Check if user is authenticated and has delete access"""
"""Check if user is authenticated and has delete access."""
log_action(user, 'DELETE')
if user.is_authenticated:
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
# If the record exists in ES
# The user provides egroup/membership
delete_access = get_access_set(record['_access'], 'delete')
owner_access = get_access_set(record['_access'], 'owner')
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# The user belongs to any access group, meaning the list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
if has_owner_permission(user, record) or (
_user_granted(delete_access) or
_user_granted(owner_access)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
......@@ -270,15 +249,6 @@ def get_access_set(access, set):
return []
def is_admin(user):
"""Check if the user is administrator"""
admin_user = current_app.config['ADMIN_USER']
if user.email == admin_user or user.email.replace('@cern.ch', '') == admin_user:
current_app.logger.debug('User {user} is admin'.format(user=user.email))
return True
return False
def is_public(data, action):
"""Check if the record is fully public.
In practice this means that the record doesn't have the ``access`` key or
......@@ -287,15 +257,6 @@ def is_public(data, action):
return '_access' not in data or not data.get('_access', {}).get(action)
def check_elasticsearch(record=None):
if record is not None:
"""Try to search for given record."""
search = request._methodview.search_class()
search = search.get_record(str(record.id))
return search.count() == 1
return False
def log_action(user, action):
try:
email = user.email
......
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2019 CERN.
#
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2019 CERN.
#
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Unit tests fixtures."""
import pytest
from invenio_app.factory import create_app
from flask_login import AnonymousUserMixin, UserMixin
@pytest.fixture(scope='module')
def app():
"""Application factory fixture."""
# FIXME: use pytest-invenio
# Applies to all calls of `with_app_context`
return create_app()
@pytest.fixture()
def anonymous_user():
"""Anonymous user (not logged in)."""
class User(AnonymousUserMixin):
def __init__(self):
super().__init__()
return User()
@pytest.fixture()
def authenticated_user():
"""Authenticated user (logged in)."""
class User(UserMixin):
def __init__(self):
super().__init__()
return User()
@pytest.fixture(scope='function')
def private_access_record():
"""Private access record."""
return {
'_access': {
'read': ['read-perm'],
'update': ['update-perm'],
'delete': ['delete-perm'],
'owner': ['owner-perm']
}
}
@pytest.fixture(scope='function')
def public_access_record():
"""Public access record."""
return {
'_access': {
'update': ['update-perm'],
'delete': ['delete-perm'],
'owner': ['owner-perm']
}
}
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2019 CERN.
#
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Tests for access/CRUD permissions."""
from cern_search_rest_api.modules.cernsearch.permissions import (
has_list_permission, has_read_record_permission, has_update_permission,
has_delete_permission, has_owner_permission
)
def test_has_list_permission(app, anonymous_user, authenticated_user):
"""Test list records (search) permission."""
with app.app_context():
assert not has_list_permission(anonymous_user)
assert has_list_permission(authenticated_user)
def test_has_read_record_permission(app, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test read record permission."""
with app.app_context():
# Anonymous user cannot read
assert not has_read_record_permission(anonymous_user, public_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert has_read_record_permission(authenticated_user, public_access_record)
assert not has_read_record_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert has_read_record_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert has_read_record_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert has_read_record_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert has_read_record_permission(authenticated_user, private_access_record)
def test_has_update_permission(app, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test update record permission."""
with app.app_context():
# Anonymous user cannot update
assert not has_update_permission(anonymous_user, public_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert not has_update_permission(authenticated_user, public_access_record)
assert not has_update_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert not has_update_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert has_update_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert has_update_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert has_update_permission(authenticated_user, private_access_record)
def test_has_delete_permission(app, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test delete record permission."""
with app.app_context():
# Anonymous user cannot update
assert not has_delete_permission(anonymous_user, public_access_record)
# User with no rights over private record and public record