Commit 2b8dabf7 authored by Carina Antunes's avatar Carina Antunes
Browse files

tests: refactor

parent c86265c9
PAErpplpKIyNRlbpzLB5srPpmSTX3sXGOMiDHKmKsFu1aOzbcDgfFSVQonxP
......@@ -5,16 +5,3 @@
#
# CERN Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
import pytest
import requests
@pytest.mark.unit
def test_ui(endpoint, api_key):
# Check the UI does not generate 50X Errors
resp = requests.post('{endpoint}/account/settings/applications/'.format(
endpoint=endpoint))
assert resp.status_code == 200
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2018-2019 CERN.
#
# CERN Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Pytest configuration.
See https://pytest-invenio.readthedocs.io/ for documentation on which test
fixtures are available.
"""
from __future__ import absolute_import, print_function
import pytest
from invenio_app.factory import create_api
@pytest.fixture(scope='module')
def create_app(instance_path):
"""Application factory fixture."""
return create_api
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2018-2019 CERN.
#
# CERN Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Test helpers."""
import os
from urllib.parse import urlunparse
from pytest_invenio.fixtures import appctx
def get_headers():
api_token = os.environ['API_TOKEN']
return {
"Accept": "application/json",
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_token}'
}
def get_schemas_endpoint(appctx, schema):
host_setting = appctx.config['JSONSCHEMAS_HOST']
url_scheme = appctx.config['JSONSCHEMAS_URL_SCHEME']
schemas_path = appctx.config['JSONSCHEMAS_ENDPOINT']
url_path = "{path}/{schema}".format(path=schemas_path, schema=schema)
return urlunparse((url_scheme, host_setting, url_path, "", "", ""))
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2018-2019 CERN.
#
# CERN Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""API tests."""
from .helpers import get_headers
def test_testclient(base_client):
res = base_client.get('/records/', headers=get_headers())
assert res.json == {
"aggregations": {},
"hits": {
"hits": [],
"total": 0
},
"links": {
"self": "http://localhost/records/?page=1&size=10"
}
}
......@@ -9,21 +9,14 @@
import json
import pytest
import requests
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
"Authorization": ''
}
from tests.api.helpers import get_headers, get_schemas_endpoint
BINARY_CONTENT = "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0"
@pytest.mark.unit
def test_binary_es_ocr(endpoint, api_key):
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
@pytest.mark.skip(reason=None)
def test_binary_es_ocr(appctx, base_client):
body = {
"_access": {
"owner": ["CernSearch-Administrators@cern.ch"],
......@@ -34,27 +27,26 @@ def test_binary_es_ocr(endpoint, api_key):
"title": "Test binary",
"description": "Test binary with ingest pipeline",
"link": "localhost/test",
"b64": "{binary_content}".format(binary_content=BINARY_CONTENT)
"b64": BINARY_CONTENT
},
"$schema": "{endpoint}/schemas/test/binary_v0.0.2.json".format(
endpoint=endpoint
)
"$schema": get_schemas_endpoint(appctx, "test/binary_v0.0.2.json")
}
# Create test record
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 201
print(resp.data)
# Check non presence of OCR content in DB record
resp_body = resp.json()['metadata']
resp_body = resp.json['metadata']
assert resp_body.get('control_number') is not None
resp_data = resp_body.get("_data")
assert resp_data.get('content') is None # Content is the target field in ES pipeline
assert resp_data.get('title') == 'Test binary'
assert resp_data.get('description') == 'Test binary with ingest pipeline'
assert resp_data.get('link') == 'localhost/test'
#TODO: is failling
assert resp_data.get('b64', None) is None
control_number = resp_body.get("control_number")
......@@ -62,13 +54,15 @@ def test_binary_es_ocr(endpoint, api_key):
# Needed to allow ES to process the file
import time
time.sleep(2)
resp = requests.get('{endpoint}/api/records/?q=control_number:{control_number}'
.format(endpoint=endpoint, control_number=control_number),
headers=HEADERS, data=json.dumps(body))
resp = base_client.get(
'/records/?q=control_number:{control_number}'.format(control_number=control_number),
headers=get_headers(),
data=json.dumps(body)
)
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 1
content = resp_hits['hits'][0]['metadata'].get("_data").get('content')
......@@ -77,13 +71,11 @@ def test_binary_es_ocr(endpoint, api_key):
assert content.get('content_type') == 'application/rtf'
# Test search over extracted fields
resp = requests.get('{endpoint}/api/records/?q=lorem'
.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.get('/records/?q=lorem', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 1
content = resp_hits['hits'][0]['metadata'].get("_data").get('content')
......@@ -92,8 +84,10 @@ def test_binary_es_ocr(endpoint, api_key):
assert content.get('content_type') == 'application/rtf'
# Clean the instance. Delete record
resp = requests.delete('{endpoint}/api/record/{control_number}'
.format(endpoint=endpoint, control_number=control_number),
headers=HEADERS, data=json.dumps(body))
resp = base_client.delete(
'/record/{control_number}'.format(control_number=control_number),
headers=get_headers(),
data=json.dumps(body)
)
assert resp.status_code == 204
......@@ -9,20 +9,11 @@
import json
import pytest
import requests
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
"Authorization": ''
}
from tests.api.helpers import get_headers, get_schemas_endpoint
@pytest.mark.unit
def test_control_number_update(endpoint, api_key):
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
def test_control_number_update(appctx, base_client):
body = {
"_access": {
"owner": ["CernSearch-Administrators@cern.ch"],
......@@ -32,25 +23,26 @@ def test_control_number_update(endpoint, api_key):
"_data": {
"title": "test_control_number_update",
"description": "Not updated document"
}
},
}
# Create test record
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
print(resp.data)
assert resp.status_code == 201
orig_record = resp.json()['metadata']
orig_record = resp.json['metadata']
# Update without control_number
body["_data"]['description'] = 'Update with no control number'
resp = requests.put('{endpoint}/api/record/{control_number}'.format(
endpoint=endpoint,
control_number=orig_record['control_number']),
headers=HEADERS, data=json.dumps(body))
resp = base_client.put(
'/record/{control_number}'.format(control_number=orig_record['control_number']),
headers=get_headers(),
data=json.dumps(body)
)
put_record = resp.json()['metadata']
put_record = resp.json['metadata']
assert resp.status_code == 200
assert put_record.get('control_number') is not None
assert put_record.get('control_number') == orig_record['control_number']
......@@ -58,30 +50,28 @@ def test_control_number_update(endpoint, api_key):
# Update with a wrong control_number
body["_data"]['description'] = 'Update with wrong control number'
resp = requests.put('{endpoint}/api/record/{control_number}'.format(
endpoint=endpoint,
control_number=orig_record['control_number']),
headers=HEADERS, data=json.dumps(body))
resp = base_client.put(
'/record/{control_number}'.format(control_number=orig_record['control_number']),
headers=get_headers(),
data=json.dumps(body)
)
put_record = resp.json()['metadata']
put_record = resp.json['metadata']
assert resp.status_code == 200
assert put_record.get('control_number') is not None
assert put_record.get('control_number') == orig_record['control_number']
assert put_record["_data"]['description'] == body["_data"]['description']
# Delete test record
resp = requests.delete('{endpoint}/api/record/{control_number}'.format(
endpoint=endpoint,
control_number=orig_record['control_number']),
headers=HEADERS)
resp = base_client.delete(
'/record/{control_number}'.format(control_number=orig_record['control_number']),
headers=get_headers())
assert resp.status_code == 204
@pytest.mark.unit
def test_access_fields_existence(endpoint, api_key):
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
def test_access_fields_existence(appctx, base_client):
# POST and PUT should follow the same workflow. Only checking POST.
# Without _access field
body = {
......@@ -90,11 +80,10 @@ def test_access_fields_existence(endpoint, api_key):
"description": "No _access field"
}
}
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 400
assert {"field": "_schema", "message": "Missing field _access"} in resp.json()['errors']
assert {"field": "_schema", "message": "Missing field _access"} in resp.json['errors']
# Without _access.delete field
body = {
......@@ -107,11 +96,10 @@ def test_access_fields_existence(endpoint, api_key):
"description": "No _access.delete field"
}
}
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 400
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.delete"} in resp.json()['errors']
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.delete"} in resp.json['errors']
# Without _access.update field
body = {
......@@ -124,11 +112,10 @@ def test_access_fields_existence(endpoint, api_key):
"description": "No _access.update field"
}
}
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 400
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.update"} in resp.json()['errors']
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.update"} in resp.json['errors']
# Without _access.owner field
body = {
......@@ -141,17 +128,14 @@ def test_access_fields_existence(endpoint, api_key):
"description": "No _access.owner field"
}
}
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 400
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.owner"} in resp.json()['errors']
assert {"field": "_schema", "message": "Missing or wrong type (not an array) in field _access.owner"} in resp.json['errors']
@pytest.mark.unit
def test_data_field_existence(endpoint, api_key):
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
def test_data_field_existence(appctx, base_client):
# Create test record without _data field
body = {
"_access": {
......@@ -163,8 +147,7 @@ def test_data_field_existence(endpoint, api_key):
"description": "No _access field"
}
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 400
assert {"field": "_schema", "message": "Missing field _data"} in resp.json()['errors']
assert {"field": "_schema", "message": "Missing field _data"} in resp.json['errors']
......@@ -8,23 +8,15 @@
import json
import pytest
import requests
from pytest_invenio.fixtures import appctx, base_client
from tests.api.helpers import get_headers, get_schemas_endpoint
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
"Authorization": ''
}
@pytest.mark.unit
def test_search(endpoint, api_key):
def test_testclient(appctx, base_client):
"""
Test search over public documents. Test that the ``_access.*`` field is
not searched over.
"""
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
body = {
"_access": {
......@@ -36,19 +28,18 @@ def test_search(endpoint, api_key):
"title": "Test default search field",
"description": "This contains CernSearch and should appear"
},
"$schema": "{endpoint}/schemas/test/doc_v0.0.2.json".format(
endpoint=endpoint
)
"$schema": get_schemas_endpoint(appctx, "test/doc_v0.0.2.json")
}
# Create first test record
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
print(resp.data)
assert resp.status_code == 201
# Check non presence of OCR content in DB record
resp_body = resp.json()['metadata']
resp_body = resp.json['metadata']
assert resp_body.get('control_number') is not None
resp_data = resp_body.get("_data")
assert resp_data.get('title') == 'Test default search field'
......@@ -60,13 +51,12 @@ def test_search(endpoint, api_key):
body["_data"]['description'] = 'This does not contains the magic word and should not appear'
# Create test record
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 201
# Check non presence of OCR content in DB record
resp_body = resp.json()['metadata']
resp_body = resp.json['metadata']
assert resp_body.get('control_number') is not None
resp_data = resp_body.get("_data")
assert resp_data.get('title') == 'Test default search field'
......@@ -76,24 +66,22 @@ def test_search(endpoint, api_key):
# # Needed to allow ES to process the file
import time
time.sleep(1)
time.sleep(3)
# Search records
# Test search with no query
resp = requests.get('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.get('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 2
resp = requests.get('{endpoint}/api/records/?q=CernSearch'
.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.get('/records/?q=CernSearch', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 1
description = resp_hits['hits'][0]['metadata'].get("_data").get('description')
......@@ -101,14 +89,19 @@ def test_search(endpoint, api_key):
assert description == 'This contains CernSearch and should appear'
# Clean the instance. Delete record
resp = requests.delete('{endpoint}/api/record/{control_number}'
.format(endpoint=endpoint, control_number=control_number_one),
headers=HEADERS, data=json.dumps(body))
resp = base_client.delete(
'/record/{control_number}'.format(control_number=control_number_one),
headers=get_headers(),
data=json.dumps(body)
)
assert resp.status_code == 204
resp = requests.delete('{endpoint}/api/record/{control_number}'
.format(endpoint=endpoint, control_number=control_number_two),
headers=HEADERS, data=json.dumps(body))
resp = base_client.delete(
'/record/{control_number}'
.format(control_number=control_number_two),
headers=get_headers(),
data=json.dumps(body)
)
assert resp.status_code == 204
......@@ -10,18 +10,10 @@ import json
import time
import pytest
import requests
from tests.api.helpers import get_headers, get_schemas_endpoint
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
"Authorization": ''
}
def create_record(endpoint, api_key, title):
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
def create_record(appctx, base_client, title):
body = {
"_access": {
"owner": ["CernSearch-Administrators@cern.ch"],
......@@ -31,19 +23,16 @@ def create_record(endpoint, api_key, title):
"_data": {
"title": title
},
"$schema": "{endpoint}/schemas/test/suggest_v0.0.2.json".format(
endpoint=endpoint
)
"$schema": get_schemas_endpoint(appctx, "test/suggest_v0.0.2.json")
}
# Create test record
resp = requests.post('{endpoint}/api/records/'.format(endpoint=endpoint),
headers=HEADERS, data=json.dumps(body))
resp = base_client.post('/records/', headers=get_headers(), data=json.dumps(body))
assert resp.status_code == 201
# Check non presence of OCR content in DB record
resp_body = resp.json()['metadata']
resp_body = resp.json['metadata']
assert resp_body.get('control_number') is not None
resp_data = resp_body.get("_data")
assert resp_data.get('title') == title
......@@ -52,20 +41,19 @@ def create_record(endpoint, api_key, title):
@pytest.mark.unit
def test_suggester(endpoint, api_key):
def test_suggester(appctx, base_client):
"""
Test search over public documents. Test that the ``_access.*`` field is
not searched over.
"""
HEADERS['Authorization'] = 'Bearer {credentials}'.format(credentials=api_key)
# Create records
control_numbers = [
create_record(endpoint, api_key, 'The First Suggestion'),
create_record(endpoint, api_key, 'Documentation site title'),
create_record(endpoint, api_key, 'CERN Search Documentation'),
create_record(endpoint, api_key, 'Invenio docs site'),
create_record(endpoint, api_key, 'The final suggester')
create_record(appctx, base_client, 'The First Suggestion'),
create_record(appctx, base_client, 'Documentation site title'),
create_record(appctx, base_client, 'CERN Search Documentation'),
create_record(appctx, base_client, 'Invenio docs site'),
create_record(appctx, base_client, 'The final suggester')
]
time.sleep(3)
......@@ -75,42 +63,34 @@ def test_suggester(endpoint, api_key):
}
# 'the f' should return 1st and 5th record
resp = requests.get('{endpoint}/api/records/'.format(endpoint=endpoint),
params=query,
headers=HEADERS)
resp = base_client.get('/records/', query_string=query, headers=get_headers())
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 2
# 'doc' should return 2nd, 3rd and 4th record
query['q'] = 'suggest:doc'
resp = requests.get('{endpoint}/api/records/'.format(endpoint=endpoint),
params=query,
headers=HEADERS)
resp = base_client.get('/records/', query_string=query, headers=get_headers())
assert resp.status_code == 200
resp_hits = resp.json()['hits']
resp_hits = resp.json['hits']
assert resp_hits.get('total') == 3
# 'f sugg' should return 1st and 5th record
query['q'] = 'suggest:f sugg'
resp = requests.get('{endpoint}/api/records/'.format(endpoint=endpoint),
params=query,