Commit 53dfd28b authored by Pablo Panero's avatar Pablo Panero
Browse files

Add support for permissions: ALLOW ALL for testing. TODO: Fix SQL database...

Add support for permissions: ALLOW ALL for testing. TODO: Fix SQL database connection/no such table error
parent 865211e7
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" Record API"""
from invenio_records.api import Record
from invenio_pidstore.models import PersistentIdentifier
from .fetchers import recid_fetcher
class CernSearchRecord(Record):
"""CERN Searc Record."""
record_fetcher = staticmethod(recid_fetcher)
@property
def pid(self):
"""Return an instance of record PID."""
pid = self.record_fetcher(self.id, self)
return PersistentIdentifier.get(pid.pid_type, pid.pid_value)
"""
@property
def depid(self):
# Return depid of the record.
return PersistentIdentifier.get(
pid_type='recid',
pid_value=self.get('_deposit', {}).get('id')
)
"""
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Persistent identifier fetcher."""
from invenio_pidstore.fetchers import FetchedPID
from .providers import CERNSearchRecordIdProvider
def recid_fetcher(record_uuid, data):
"""Fetch PID from record."""
return FetchedPID(
provider=CERNSearchRecordIdProvider,
pid_type='recid',
pid_value=str(data['recid'])
)
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
\ No newline at end of file
# -*- coding: utf-8 -*-
from flask_security import current_user
from flask_principal import ActionNeed
from invenio_access import DynamicPermission
from .utils import get_user_provides
"""Access control for CERN Search."""
def record_permission_factory(record=None, action=None):
"""Record permission factory."""
return RecordPermission.create(record, action)
def record_create_permission_factory(record=None):
"""Create permission factory."""
return record_permission_factory(record=record, action='create')
def record_read_permission_factory(record=None):
"""Read permission factory."""
return record_permission_factory(record=record, action='read')
def record_update_permission_factory(record=None):
"""Update permission factory."""
return record_permission_factory(record=record, action='update')
def record_delete_permission_factory(record=None):
"""Delete permission factory."""
return record_permission_factory(record=record, action='delete')
class RecordPermission(object):
"""Record permission.
- Create action given to admins only.
- Read access given to everyone if public, according to a record if not.
- Update access given to record owners.
- Delete access given to admins only.
"""
create_actions = ['create']
read_actions = ['read']
read_files_actions = ['read-files']
read_eos_path_actions = ['read-eos-path']
update_actions = ['update']
delete_actions = ['delete']
def __init__(self, record, func, user):
"""Initialize a file permission object."""
self.record = record
self.func = func
self.user = user or current_user
def can(self):
"""Determine access."""
return self.func(self.user, self.record)
@classmethod
def create(cls, record, action, user=None):
"""Create a record permission."""
# Allow everything for testing
if action in cls.create_actions:
return cls(record, allow, user) # return cls(record, has_admin_permission, user)
elif action in cls.read_actions:
return cls(record, allow, user) # return cls(record, has_read_record_permission, user)
elif action in cls.update_actions:
return cls(record, allow, user) # return cls(record, has_update_permission, user)
elif action in cls.delete_actions:
return cls(record, allow, user) # return cls(record, has_admin_permission, user)
else:
return cls(record, deny, user)
def has_read_record_permission(user, record):
"""Check if user has read access to the record."""
# Allow everyone for public records
if is_public(record, 'read'):
return True
# Allow e-group members
user_provides = get_user_provides()
read_access_groups = record['_access']['read']
if not set(user_provides).isdisjoint(set(read_access_groups)):
return True
return has_admin_permission()
def has_update_permission(user, record):
"""Check if user has update access to the record."""
user_id = int(user.get_id()) if user.is_authenticated else None
# Allow owners
deposit_creator = record.get('_deposit', {}).get('created_by', -1)
if user_id == deposit_creator:
return True
# Allow based in the '_access' key
user_provides = get_user_provides()
# set.isdisjoint() is faster than set.intersection()
allowed_users = record.get('_access', {}).get('update', [])
if allowed_users and not set(user_provides).isdisjoint(set(allowed_users)):
return True
return has_admin_permission()
def has_admin_permission(user=None, record=None):
"""Check if user has admin access to record.
This function has to accept 2 parameters (as all other has_foo_permissions,
to allow for dynamic dispatch.
"""
# Allow administrators
# Taken from invenio_desposit.permissions.py due to incompatibilities in the ES version support
return DynamicPermission(
ActionNeed('index-admin-access')).can() # TODO Update for invenio_access.permissions.Permission
#
# Utility functions
#
def deny(user, record):
"""Deny access."""
return False
def allow(user, record):
"""Allow access."""
return True
def is_public(data, action):
"""Check if the record is fully public.
In practice this means that the record doesn't have the ``access`` key or
the action is not inside access or is empty.
"""
return '_access' not in data or not data.get('_access', {}).get(action)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
from flask import current_app
from invenio_pidstore.errors import PersistentIdentifierError
from invenio_pidstore.models import PIDStatus, RecordIdentifier
from invenio_pidstore.providers.base import BaseProvider
class CERNSearchRecordIdProvider(BaseProvider):
"""Record identifier provider."""
pid_type = 'recid'
"""Type of persistent identifier."""
pid_provider = None
"""Provider name.
The provider name is not recorded in the PID since the provider does not
provide any additional features besides creation of record ids.
"""
default_status = PIDStatus.RESERVED
"""Record IDs are by default registered immediately."""
@classmethod
def create(cls, object_type=None, object_uuid=None, **kwargs):
"""Create a new record identifier."""
# Request next integer in recid sequence.
assert 'pid_value' not in kwargs
provider_url = current_app.config.get('RECORDS_ID_PROVIDER_ENDPOINT',
None)
if not provider_url:
# Don't query external service in DEBUG mode
kwargs['pid_value'] = str(RecordIdentifier.next())
else:
response = requests.get(
provider_url, headers={'User-Agent': 'cernsearch'})
if not response.ok or response.text.strip().startswith('[ERROR]'):
raise PersistentIdentifierError(response.text)
kwargs['pid_value'] = response.text
kwargs.setdefault('status', cls.default_status)
if object_type and object_uuid:
kwargs['status'] = PIDStatus.REGISTERED
return super(CERNSearchRecordIdProvider, cls).create(
object_type=object_type, object_uuid=object_uuid, **kwargs)
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Helper methods for CERN Search records."""
from flask import g
def get_user_provides():
"""Extract the user's provides from g."""
return [need.value for need in g.identity.provides]
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment