Commit 3f5af787 authored by Pablo Panero's avatar Pablo Panero
Browse files

Refactor: Permissions changed to accept on cascade (admin>owner>delete>update>read)

parent 3c9abe87
......@@ -126,12 +126,22 @@ def has_update_permission(user, record):
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
update_access_groups = record['_access']['update']
if check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(update_access_groups))
or is_admin(user)
):
update_access_groups = get_access_set(record['_access'], 'update')
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
# If the record exists in ES
# The user provides egroup/membership
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(update_access_groups))
or not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
return False
......@@ -144,16 +154,28 @@ def has_read_record_permission(user, record):
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
try:
read_access_groups = record['_access']['read']
if check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(read_access_groups))
or is_admin(user)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
except KeyError:
read_access_groups = get_access_set(record['_access'], 'read')
update_access_groups = get_access_set(record['_access'], 'update')
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
if not read_access_groups:
return True
# If the record exists in ES
# The user provides egroup/membership
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(read_access_groups))
or not set(user_provides).isdisjoint(set(update_access_groups))
or not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
return False
......@@ -165,12 +187,20 @@ def has_delete_permission(user, record):
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
delete_access_groups = record['_access']['delete']
if check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(delete_access_groups))
or is_admin(user)
):
delete_access_groups = get_access_set(record['_access'], 'delete')
owner_access_groups = get_access_set(record['_access'], 'owner')
# If the record exists in ES
# The user provides egroup/membership
# The user is owner of the collection/schema
# The user is admin of the instance or any access group list is disjoint
# Then grant access
if is_admin(user) or (
check_elasticsearch(record) and user_provides and has_owner_permission(user) and \
(
not set(user_provides).isdisjoint(set(delete_access_groups))
or not set(user_provides).isdisjoint(set(owner_access_groups))
)
):
current_app.logger.debug('Group sets not disjoint, user allowed')
return True
return False
......@@ -218,7 +248,6 @@ def has_admin_view_permission(user):
# Utility functions
def deny(user, record):
"""Deny access."""
return False
......@@ -229,6 +258,13 @@ def allow(user, record):
return True
def get_access_set(access, set):
try:
return access[set]
except KeyError:
return []
def is_admin(user):
"""Check if the user is administrator"""
admin_user = current_app.config['ADMIN_USER']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment