Commit 9987b74f authored by Carina Antunes's avatar Carina Antunes
Browse files

[SEARCH-88] Permissions improvements

parent 5a438720
......@@ -164,11 +164,9 @@ def has_owner_permission(user, record=None):
def has_list_permission(user, record=None):
"""Check if user is authenticated and has create access."""
if user:
log_action(user, 'LIST')
return user.is_authenticated
else:
return False
# permissions are handled in the ES QUERY
log_action(user, 'LIST')
return True
def has_update_permission(user, record):
......@@ -196,16 +194,16 @@ def has_update_permission(user, record):
def has_read_record_permission(user, record):
"""Check if user is authenticated and has read access. This implies reading one document."""
log_action(user, 'READ')
if user.is_authenticated:
# Allow based in the '_access' key
read_access = get_access_set(record['_access'], 'read')
update_access = get_access_set(record['_access'], 'update')
delete_access = get_access_set(record['_access'], 'delete')
owner_access = get_access_set(record['_access'], 'owner')
if not read_access:
return True
# Allow based in the '_access' key
read_access = get_access_set(record['_access'], 'read')
update_access = get_access_set(record['_access'], 'update')
delete_access = get_access_set(record['_access'], 'delete')
owner_access = get_access_set(record['_access'], 'owner')
if not read_access:
return True
if user.is_authenticated:
# The user is owner of the collection/schema
# The user belongs to any access group, meaning the list is disjoint
# Then grant access
......@@ -272,13 +270,9 @@ def has_admin_view_permission(user):
"""Check if has admin permission."""
admin_access_groups = current_app.config['ADMIN_VIEW_ACCESS_GROUPS']
if user.is_authenticated and admin_access_groups:
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
admin_access_groups = admin_access_groups.split(',')
if user_provides and not set(user_provides).isdisjoint(set(admin_access_groups)):
current_app.logger.debug('User has admin view access')
return True
admin_access = admin_access_groups.split(',')
return _user_granted(admin_access)
return False
......
......@@ -7,10 +7,11 @@
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Search utilities."""
from cern_search_rest_api.modules.cernsearch.permissions import has_admin_view_permission
from cern_search_rest_api.modules.cernsearch.utils import get_user_provides
from elasticsearch_dsl import Q
from flask import current_app, request
from flask_login import current_user
from invenio_records_rest.query import default_search_factory
from invenio_search import RecordsSearch
from invenio_search.api import DefaultFilter
......@@ -66,8 +67,8 @@ def cern_search_filter():
def get_egroups():
"""Get egroups from access param, config or authenticated user."""
egroups = request.args.get('access', None)
# If access rights are sent or is a search query
if egroups or (request.path == '/records/' and request.method == 'GET'):
# If access rights are sent and is admin_view_account
if egroups and has_admin_view_permission(current_user):
try:
if current_app.config['SEARCH_USE_EGROUPS']:
return ['{0}@cern.ch'.format(egroup) for egroup in egroups.split(',')]
......@@ -75,6 +76,7 @@ def get_egroups():
return egroups.split(',')
except AttributeError:
return None
# Else use user's token ACLs
return get_user_provides()
......
......@@ -21,6 +21,7 @@ def test_file_ops(app, appctx, db, client, user, location):
body = {
"_access": {
"read": ["CernSearch-Administrators@cern.ch"],
"owner": ["CernSearch-Administrators@cern.ch"],
"update": ["CernSearch-Administrators@cern.ch"],
"delete": ["CernSearch-Administrators@cern.ch"]
......@@ -84,7 +85,11 @@ def test_file_ops(app, appctx, db, client, user, location):
import time
time.sleep(2)
res = client.get(f'/records/?q={quote_plus(case["content"])}', headers=get_headers())
res = client.get(
'/records/',
query_string={'q': quote_plus(case["content"]), 'access': 'CernSearch-Administrators'},
headers=get_headers()
)
assert res.status_code == HTTPStatus.OK
res_hits = res.json['hits']
......
......@@ -15,15 +15,16 @@ from cern_search_rest_api.modules.cernsearch.permissions import (has_delete_perm
def test_has_list_permission(appctx, anonymous_user, authenticated_user):
"""Test list records (search) permission."""
assert not has_list_permission(anonymous_user)
assert has_list_permission(anonymous_user)
assert has_list_permission(authenticated_user)
def test_has_read_record_permission(appctx, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test read record permission."""
# Anonymous user cannot read
assert not has_read_record_permission(anonymous_user, public_access_record)
# Anonymous usertest_file_ops
assert has_read_record_permission(anonymous_user, public_access_record)
assert not has_read_record_permission(anonymous_user, private_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment