Commit b16b602f authored by Carina Antunes's avatar Carina Antunes
Browse files

[SEARCH-114] Limit access to alias instances GET /records

parent 44aca735
......@@ -8,6 +8,7 @@ CERN_SEARCH_FILES_PROCESSOR_QUEUE=files_processor
CERN_SEARCH_FILES_PROCESSOR_QUEUE_DLX=files_processor_dlx
CERN_SEARCH_FILES_PROCESSOR_EXCHANGE=default
CERN_SEARCH_FILES_PROCESSOR_EXCHANGE_DLX=dlx
CERN_SEARCH_INSTANCE_IMMUTABLE='False'
CONTAINER_NAME=cern-search-rest-api
WORKER_APP=invenio_app.celery
......
......@@ -96,6 +96,9 @@ SEARCH_MAPPINGS = [os.getenv('CERN_SEARCH_INSTANCE', 'test')]
SEARCH_USE_EGROUPS = ast.literal_eval(os.getenv('CERN_SEARCH_USE_EGROUPS', 'True'))
SEARCH_DOC_PIPELINES = ast.literal_eval(os.getenv('CERN_SEARCH_DOC_PIPELINES', '{}'))
# Alias instance - don't allow updates, allow only search
SEARCH_INSTANCE_IMMUTABLE = ast.literal_eval(os.getenv('CERN_SEARCH_INSTANCE_IMMUTABLE', 'False'))
# Records REST configuration
# ===========================
......
......@@ -17,6 +17,10 @@ from invenio_records_files.api import FileObject
from invenio_records_files.models import RecordsBuckets
def _is_instance_immutable():
return current_app.config['SEARCH_INSTANCE_IMMUTABLE']
def record_permission_factory(record=None, action=None):
"""Record permission factory."""
return RecordPermission.create(record, action)
......@@ -149,6 +153,9 @@ def has_owner_permission(user, record=None):
"""Check if user is authenticated and has create access."""
log_action(user, 'CREATE/OWNER')
if _is_instance_immutable():
return False
# First authentication phase, decorator level
if not record:
return user.is_authenticated
......@@ -172,6 +179,10 @@ def has_list_permission(user, record=None):
def has_update_permission(user, record):
"""Check if user is authenticated and has update access."""
log_action(user, 'UPDATE')
if _is_instance_immutable():
return False
if user.is_authenticated:
# Allow based in the '_access' key
update_access = get_access_set(record['_access'], 'update')
......@@ -195,6 +206,9 @@ def has_read_record_permission(user, record):
"""Check if user is authenticated and has read access. This implies reading one document."""
log_action(user, 'READ')
if _is_instance_immutable():
return False
# Allow based in the '_access' key
read_access = get_access_set(record['_access'], 'read')
update_access = get_access_set(record['_access'], 'update')
......@@ -221,6 +235,10 @@ def has_read_record_permission(user, record):
def has_delete_permission(user, record):
"""Check if user is authenticated and has delete access."""
log_action(user, 'DELETE')
if _is_instance_immutable():
return False
if user.is_authenticated:
# Allow based in the '_access' key
delete_access = get_access_set(record['_access'], 'delete')
......
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2019 CERN.
#
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Tests for access/CRUD permissions."""
import pytest
from cern_search_rest_api.modules.cernsearch.permissions import (has_delete_permission, has_list_permission,
has_owner_permission, has_read_record_permission,
has_update_permission)
@pytest.fixture(scope='module')
def app_config(app_config):
"""Application configuration fixture."""
app_config['SEARCH_INSTANCE_IMMUTABLE'] = True
return app_config
def test_has_list_permission(appctx, anonymous_user, authenticated_user):
"""Test list records (search) permission."""
assert has_list_permission(anonymous_user)
assert has_list_permission(authenticated_user)
def test_has_read_record_permission(appctx, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test read record permission."""
# Anonymous usertest_file_ops
assert not has_read_record_permission(anonymous_user, public_access_record)
assert not has_read_record_permission(anonymous_user, private_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert not has_read_record_permission(authenticated_user, public_access_record)
assert not has_read_record_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert not has_read_record_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert not has_read_record_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert not has_read_record_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert not has_read_record_permission(authenticated_user, private_access_record)
def test_has_update_permission(appctx, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test update record permission."""
# Anonymous user cannot update
assert not has_update_permission(anonymous_user, public_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert not has_update_permission(authenticated_user, public_access_record)
assert not has_update_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert not has_update_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert not has_update_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert not has_update_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert not has_update_permission(authenticated_user, private_access_record)
def test_has_delete_permission(appctx, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test delete record permission."""
# Anonymous user cannot update
assert not has_delete_permission(anonymous_user, public_access_record)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert not has_delete_permission(authenticated_user, public_access_record)
assert not has_delete_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert not has_delete_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert not has_delete_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert not has_delete_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert not has_delete_permission(authenticated_user, private_access_record)
def test_has_owner_permission(appctx, mocker, anonymous_user, authenticated_user,
private_access_record, public_access_record):
"""Test colletion/schema owner permission."""
appctx.config['ADMIN_ACCESS_GROUPS'] = 'non-existing-perm,owner-perm'
# Anonymous user cannot update
assert not has_owner_permission(anonymous_user, public_access_record)
# Authenticated user when theres no record
assert not has_owner_permission(authenticated_user, None)
assert not has_owner_permission(authenticated_user)
# User with no rights over private record and public record
patched_g = mocker.patch('cern_search_rest_api.modules.cernsearch.utils.g')
patched_g.identity.provides = []
assert not has_owner_permission(authenticated_user, public_access_record)
assert not has_owner_permission(authenticated_user, private_access_record)
# User with read permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='read-perm')]
assert not has_owner_permission(authenticated_user, private_access_record)
# User with update permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='update-perm')]
assert not has_owner_permission(authenticated_user, private_access_record)
# User with delete permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='delete-perm')]
assert not has_owner_permission(authenticated_user, private_access_record)
# User with owner permissions
patched_g.identity.provides = [mocker.Mock(method='perm', value='owner-perm')]
assert not has_owner_permission(authenticated_user, private_access_record)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment