Commit 0d38aae3 authored by Carina Antunes's avatar Carina Antunes
Browse files

Release 1.7.0-beta

parent a59724b5
**Version 1.7.0-beta (released 2020-04-07)**
- Decrease max file content size to 1MB
- [EDMS] Document mapping weights update
**Version 1.6.1-beta (released 2020-01-19)**
......@@ -32,7 +32,7 @@ build-env:
.PHONY: build-env
docker-compose -f $(DOCKER_FILE) up -d --build --remove-orphans
docker-compose -f $(DOCKER_FILE) build --no-cache --parallel
.PHONY: rebuild-env
......@@ -13,8 +13,8 @@ documents and search among them when needed!
1. Run `make env MODE=test`
2. Follow [instructions](#tls---how-to-install-certificate) to install certificate.
3. Chrome https://localhost
Read more on the makefile.
Note: When sending records in this setup set the `$schema` field as `"<instance>/<record>.json"`
Read more on the makefile and the Docs.
- **Docker + Poetry: Read more on the makefile**
1. Run `make local-env MODE=test`
......@@ -7,14 +7,17 @@
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Click command-line utilities."""
import json
import click
from flask.cli import with_appcontext
from invenio_indexer.tasks import process_bulk_queue
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
from invenio_search import current_search
from invenio_search.cli import es_version_check
from cern_search_rest_api.modules.cernsearch.indexer import CernSearchRecordIndexer
from cern_search_rest_api.modules.cernsearch.indexer_tasks import process_bulk_queue
def abort_if_false(ctx, param, value):
......@@ -125,7 +128,20 @@ def reindex(pid_type, id_list, doc_type=None):
if doc_type:
query = query.filter(RecordMetadata.json.op("->>")("$schema").contains(doc_type))
query = (x[0] for x in query.values(PersistentIdentifier.object_uuid))
query = (x[0] for x in query.yield_per(100).values(PersistentIdentifier.object_uuid))
click.secho('Execute "run" command to process the queue!', fg="yellow")
@click.option("-f", "--force", is_flag=True, default=False)
@click.option("-v", "--verbose", is_flag=True, default=False)
def index_init(index_name, force, verbose):
"""Init index by its name."""
results = list(current_search.create(index_list=[index_name], ignore_existing=force))
if verbose:
......@@ -8,13 +8,16 @@
# under the terms of the MIT License; see LICENSE file for more details.
"""Indexer utilities."""
import json as json_lib
from json import JSONDecodeError
from celery.utils.log import get_task_logger
from flask import current_app
from import FileStorage
from invenio_indexer.api import RecordIndexer
from cern_search_rest_api.modules.cernsearch.api import CernSearchRecord
from cern_search_rest_api.modules.cernsearch.file_meta import extract_metadata_from_processor
from cern_search_rest_api.modules.cernsearch.tasks import process_file_async
......@@ -28,9 +31,11 @@ COLLECTION_KEY = "collection"
NAME_KEY = "name"
KEYWORDS_KEY = "keywords"
CREATION_KEY = "creation_date"
# Hard limit on content on 99.9MB due to ES limitations
# Hard limit on content on 1MB due to ES limitations
# Ref:
CONTENT_HARD_LIMIT = int(99.9 * 1024 * 1024)
CONTENT_HARD_LIMIT = int(1 * 1024 * 1024)
logger = get_task_logger(__name__)
class CernSearchRecordIndexer(RecordIndexer):
......@@ -38,6 +43,37 @@ class CernSearchRecordIndexer(RecordIndexer):
record_cls = CernSearchRecord
# Add ensure connection
def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
"""Index record in Elasticsearch asynchronously.
:param record_id_iterator: Iterator that yields record UUIDs.
:param op_type: Indexing operation (one of ``index``, ``create``,
``delete`` or ``update``).
:param index: The Elasticsearch index. (Default: ``None``)
:param doc_type: The Elasticsearch doc_type. (Default: ``None``)
def errback(exc, interval):
current_app.logging.exception(exc)"Retry in %s seconds.", interval)
with self.create_producer() as producer:
producer.connection.ensure_connection(errback=errback, max_retries=3, timeout=5)
for rec in record_id_iterator:
def index_file_content(
......@@ -49,19 +85,31 @@ def index_file_content(
"""Index file content in search."""
if not record.files and not record.files_content:
if not record.files_content:
file_obj = next(record.files)
logger.warning("Not file content, retrying file: %s in %s", file_obj.obj.basename,
process_file_async.delay(str(file_obj.obj.bucket_id), file_obj.obj.key)
for file_obj in record.files_content:
current_app.logger.debug("Index file content: %s in %s", file_obj.obj.basename,
logger.debug("Index file content: %s in %s", file_obj.obj.basename,
json[FILE_KEY] = file_obj.obj.basename
storage = # type: FileStorage
with as fp:
file_content = json_lib.load(fp)
file_content = json_lib.load(fp)
except JSONDecodeError:
logger.error("File content contains invalid json: %s in %s", file_obj.obj.basename,
check_file_content_limit(file_content, file_obj.obj.basename,
json[DATA_KEY][CONTENT_KEY] = file_content["content"]
json[FILE_KEY] = file_obj.obj.basename
if current_app.config.get("PROCESS_FILE_META"):
index_metadata(file_content, json, file_obj.obj.basename)
......@@ -91,6 +139,10 @@ def index_metadata(file_content, json, file_name):
def check_file_content_limit(file_content, file_name, record_id):
"""Check file content limit and truncate if necessary."""
file_content["content"] = file_content.get("content", "")
if not file_content["content"]:
logger.error("No file content: %s in %s", file_name, record_id)
if len(str(file_content["content"])) > CONTENT_HARD_LIMIT:
current_app.logger.warning("Truncated file content: %s in %s", file_name, record_id)
logger.warning("Truncated file content: %s in %s", file_name, record_id)
file_content["content"] = str(file_content["content"])[:CONTENT_HARD_LIMIT]
# -*- coding: utf-8 -*-
# This file is part of Invenio.
# Copyright (C) 2016-2018 CERN.
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Celery tasks to index records."""
from __future__ import absolute_import, print_function
from celery import shared_task
from .indexer import CernSearchRecordIndexer
def process_bulk_queue(version_type=None, es_bulk_kwargs=None):
"""Process bulk indexing queue.
:param str version_type: Elasticsearch version type.
:param dict es_bulk_kwargs: Passed to
Note: You can start multiple versions of this task.
def index_record(record_uuid):
"""Index a single record.
:param record_uuid: The record UUID.
def delete_record(record_uuid):
"""Delete a single record.
:param record_uuid: The record UUID.
......@@ -53,22 +53,22 @@
"document_id": {
"type": "keyword",
"normalizer": "case_accent_normalizer",
"boost": 2
"boost": 5
"document_id_version": {
"type": "keyword",
"normalizer": "case_accent_normalizer",
"boost": 2
"boost": 5
"cern_id": {
"type": "keyword",
"normalizer": "case_accent_normalizer",
"boost": 2
"boost": 5
"cern_id_version": {
"type": "keyword",
"normalizer": "case_accent_normalizer",
"boost": 2
"boost": 5
"external_reference": {
"type": "text",
......@@ -10,12 +10,14 @@
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError, Reject
from flask import current_app
from celery.utils.log import get_task_logger
from invenio_files_processor.errors import InvalidProcessor
from invenio_files_processor.processors.tika.unpack import UnpackProcessor
from invenio_files_processor.proxies import current_processors
from invenio_files_rest.models import ObjectVersion
logger = get_task_logger(__name__)
......@@ -27,16 +29,16 @@ from invenio_files_rest.models import ObjectVersion
def process_file_async(self, bucket_id, key_id):
"""Process file with processor tika."""
current_app.logger.debug("Processing file %s:%s", bucket_id, key_id)
logger.debug("Processing file %s:%s", bucket_id, key_id)
obj = ObjectVersion.get(bucket_id, key_id) # type: ObjectVersion
processor = current_processors.get_processor( # type: UnpackProcessor
current_app.logger.debug("Processed file %s:%s", bucket_id, key_id)
logger.debug("Processed file %s:%s", bucket_id, key_id)
except InvalidProcessor:
# Because we use use reject_on_worker_lost, we need to handle occasional processed files been requeued.
current_app.logger.debug("Requeued file %s:%s already processed", bucket_id, key_id)
logger.warning("Requeued file %s:%s already processed", bucket_id, key_id)
except Exception:
raise self.retry()
......@@ -15,4 +15,4 @@ and parsed by ````.
from __future__ import absolute_import, print_function
__version__ = "1.6.1-beta"
__version__ = "1.7.0-beta"
......@@ -69,7 +69,10 @@ setup(
"invenio_base.blueprints": [
"health_check = cern_search_rest_api.modules.cernsearch.views:build_health_blueprint"
"invenio_celery.tasks": ["cern-search = cern_search_rest_api.modules.cernsearch.tasks"],
"invenio_celery.tasks": [
"cern-search = cern_search_rest_api.modules.cernsearch.tasks",
"cern-search-indexer_tasks = cern_search_rest_api.modules.cernsearch.indexer_tasks",
"flask.commands": ["utils = cern_search_rest_api.modules.cernsearch.cli:utils"],
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment