Commit c22c1b7c authored by Carina Antunes's avatar Carina Antunes
Browse files

[SEARCH-85] Fix reindex cli

parent c3e08265
......@@ -7,7 +7,7 @@
# under the terms of the MIT License; see LICENSE file for more details.
# Use CentOS7:
FROM gitlab-registry.cern.ch/webservices/cern-search/cern-search-rest-api/cern-search-rest-api-base:dafbe485df036940ba6ceb2e5a216db33e816872
FROM gitlab-registry.cern.ch/webservices/cern-search/cern-search-rest-api/cern-search-rest-api-base:bfdd86117598a031f427328c9d276f7f1b782520
ARG build_env
# CERN Search installation
......
......@@ -19,12 +19,12 @@ invenio-app = "<1.3.0,>=1.2.0"
invenio-base = "<1.3.0,>=1.2.0"
invenio-config = "<1.1.0,>=1.0.2"
invenio-db = {version = ">=1.0.0,<1.1.0",extras = ["postgresql", "versioning"]}
invenio-indexer = {extras = ["elasticsearch7"],version = "<1.2.0,>=1.1.1"}
invenio-indexer = ">=1.1.2,<1.2.0"
invenio-jsonschemas = ">=1.0.0,<1.1.0"
invenio-logging = ">=1.0.0,<1.1.0"
invenio-records-rest = {extras = ["elasticsearch7"],version = "<1.7.0,>=1.6.0"}
invenio-records-rest = "<=1.7.0,>1.6.5"
invenio-records = {extras = ["postgresql"],version = ">=1.0.0,<1.3.0"}
invenio-rest = {extras = ["cors"],version = "<1.2.0,>=1.1.0"}
invenio-rest = "<1.3,>=1.2.0"
invenio-oauthclient = ">=1.0.0,<1.1.0"
invenio-oauth2server = ">=1.0.0,<1.1.0"
invenio-search = {extras = ["elasticsearch7"],version = ">=1.2.0,<1.3.0"}
......@@ -46,6 +46,7 @@ invenio-files-processor = {extras = ["tika"],git = "https://github.com/carantune
Werkzeug = ">=0.15,<1.0.0"
pip = "*"
eventlet = "*"
wtforms = "<2.3.0"
[requires]
python_version = "3.6"
This diff is collapsed.
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# This file is part of CERN Search.
# Copyright (C) 2018-2019 CERN.
#
# Citadel Search is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Click command-line utilities."""
import click
from cern_search_rest_api.modules.cernsearch.indexer import CernSearchRecordIndexer
from flask.cli import with_appcontext
from invenio_indexer.tasks import process_bulk_queue
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
def abort_if_false(ctx, param, value):
"""Abort command is value is False."""
if not value:
ctx.abort()
@click.group()
def utils():
"""Misc management commands."""
@utils.command('runindex')
@click.option(
'--delayed', '-d', is_flag=True, help='Run indexing in background.')
@click.option(
'--concurrency', '-c', default=1, type=int,
help='Number of concurrent indexing tasks to start.')
@click.option('--queue', '-q', type=str,
help='Name of the celery queue used to put the tasks into.')
@click.option('--version-type', help='Elasticsearch version type to use.')
@click.option(
'--raise-on-error/--skip-errors', default=True,
help='Controls if Elasticsearch bulk indexing errors raise an exception.')
@with_appcontext
def run(delayed, concurrency, version_type=None, queue=None,
raise_on_error=True):
"""Run bulk record indexing."""
if delayed:
celery_kwargs = {
'kwargs': {
'version_type': version_type,
'es_bulk_kwargs': {'raise_on_error': raise_on_error},
}
}
click.secho(
'Starting {0} tasks for indexing records...'.format(concurrency),
fg='green')
if queue is not None:
celery_kwargs.update({'queue': queue})
for c in range(0, concurrency):
process_bulk_queue.apply_async(**celery_kwargs)
else:
click.secho('Indexing records...', fg='green')
CernSearchRecordIndexer(version_type=version_type).process_bulk_queue(
es_bulk_kwargs={'raise_on_error': raise_on_error})
@utils.command('reindex')
@click.option('--yes-i-know', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Do you really want to reindex all records?')
@click.option('-t', '--pid-type', multiple=True, required=True)
@with_appcontext
def reindex(pid_type):
"""Reindex all records.
:param pid_type: Pid type.
"""
click.secho('Sending records to indexing queue ...', fg='green')
query = (x[0] for x in PersistentIdentifier.query.filter_by(
object_type='rec', status=PIDStatus.REGISTERED
).filter(
PersistentIdentifier.pid_type.in_(pid_type)
).values(
PersistentIdentifier.object_uuid
))
CernSearchRecordIndexer().bulk_index(query)
click.secho('Execute "run" command to process the queue!',
fg='yellow')
......@@ -11,10 +11,10 @@ from cern_search_rest_api.modules.cernsearch.api import CernSearchRecord
from cern_search_rest_api.modules.cernsearch.files import (delete_all_record_files, delete_file_instance,
delete_previous_record_file_if_exists, delete_record_file,
persist_file_content, record_from_object_version)
from cern_search_rest_api.modules.cernsearch.indexer import CernSearchRecordIndexer
from cern_search_rest_api.modules.cernsearch.tasks import process_file_async
from flask import current_app
from invenio_files_rest.models import ObjectVersion
from invenio_indexer.api import RecordIndexer
def file_uploaded_listener(obj: ObjectVersion = None):
......@@ -38,7 +38,7 @@ def file_processed_listener(app, processor_id, file: ObjectVersion, data):
record = record_from_object_version(file)
persist_file_content(record, file_content, file.basename)
RecordIndexer().index(record)
CernSearchRecordIndexer().index(record)
# delete file from filesystem only after indexing successfully
delete_file_instance(file)
......@@ -49,7 +49,7 @@ def file_deleted_listener(obj: ObjectVersion = None):
delete_record_file(obj)
record = record_from_object_version(obj)
RecordIndexer().index(record)
CernSearchRecordIndexer().index(record)
def record_deleted_listener(sender, record: CernSearchRecord, *args, **kwargs):
......
......@@ -46,6 +46,9 @@ setup(
include_package_data=True,
platforms='any',
entry_points={
'console_scripts': [
'cern_search_rest_api = invenio_app.cli:cli',
],
'invenio_config.module': [
'cern_search_rest_api = cern_search_rest_api.config'
],
......@@ -74,7 +77,10 @@ setup(
],
'invenio_celery.tasks': [
'cern-search = cern_search_rest_api.modules.cernsearch.tasks'
]
],
'flask.commands': [
'utils = cern_search_rest_api.modules.cernsearch.cli:utils'
],
},
classifiers=[
'Environment :: Web Environment',
......
......@@ -31,7 +31,7 @@ def test_file_uploaded_listener(
@patch('cern_search_rest_api.modules.cernsearch.receivers.delete_file_instance')
@patch('cern_search_rest_api.modules.cernsearch.receivers.RecordIndexer')
@patch('cern_search_rest_api.modules.cernsearch.receivers.CernSearchRecordIndexer')
@patch('cern_search_rest_api.modules.cernsearch.receivers.persist_file_content')
@patch('cern_search_rest_api.modules.cernsearch.receivers.record_from_object_version')
def test_file_processed_listener(
......@@ -63,7 +63,7 @@ def test_file_processed_listener(
delete_file_mock.assert_called_once_with(file.obj)
@patch('cern_search_rest_api.modules.cernsearch.receivers.RecordIndexer')
@patch('cern_search_rest_api.modules.cernsearch.receivers.CernSearchRecordIndexer')
@patch('cern_search_rest_api.modules.cernsearch.receivers.delete_record_file')
@patch('cern_search_rest_api.modules.cernsearch.receivers.record_from_object_version')
def test_file_deleted_listener(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment