Commit 779ebb03 authored by Carina Antunes's avatar Carina Antunes
Browse files

[NI] Add options to reindex cli

parent c325b42e
......@@ -13,6 +13,7 @@ from cern_search_rest_api.modules.cernsearch.indexer import CernSearchRecordInde
from flask.cli import with_appcontext
from invenio_indexer.tasks import process_bulk_queue
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
def abort_if_false(ctx, param, value):
......@@ -29,6 +30,9 @@ def utils():
@utils.command('runindex')
@click.option(
'--delayed', '-d', is_flag=True, help='Run indexing in background.')
@click.option(
'--chunk_size', '-s', default=500, type=int,
help='Chunks size.')
@click.option(
'--concurrency', '-c', default=1, type=int,
help='Number of concurrent indexing tasks to start.')
......@@ -39,14 +43,14 @@ def utils():
'--raise-on-error/--skip-errors', default=True,
help='Controls if Elasticsearch bulk indexing errors raise an exception.')
@with_appcontext
def run(delayed, concurrency, version_type=None, queue=None,
def run(delayed, concurrency, chunk_size, version_type=None, queue=None,
raise_on_error=True):
"""Run bulk record indexing."""
if delayed:
celery_kwargs = {
'kwargs': {
'version_type': version_type,
'es_bulk_kwargs': {'raise_on_error': raise_on_error},
'es_bulk_kwargs': {'raise_on_error': raise_on_error, 'chunk_size': chunk_size},
}
}
click.secho(
......@@ -59,7 +63,7 @@ def run(delayed, concurrency, version_type=None, queue=None,
else:
click.secho('Indexing records...', fg='green')
CernSearchRecordIndexer(version_type=version_type).process_bulk_queue(
es_bulk_kwargs={'raise_on_error': raise_on_error})
es_bulk_kwargs={'raise_on_error': raise_on_error, 'chunk_size': chunk_size})
@utils.command('reindex')
......@@ -67,21 +71,42 @@ def run(delayed, concurrency, version_type=None, queue=None,
expose_value=False,
prompt='Do you really want to reindex all records?')
@click.option('-t', '--pid-type', multiple=True, required=True)
@click.option(
'-i',
'--id',
'id_list',
help='List of ids.',
multiple=True
)
@click.option('-d', '--doc-type', required=False)
@with_appcontext
def reindex(pid_type):
def reindex(pid_type, id_list, doc_type=None):
"""Reindex all records.
:param pid_type: Pid type.
:param id_list: List of ids.
:param doc_type: Doc type
"""
click.secho('Sending records to indexing queue ...', fg='green')
query = (x[0] for x in PersistentIdentifier.query.filter_by(
object_type='rec', status=PIDStatus.REGISTERED
).filter(
PersistentIdentifier.pid_type.in_(pid_type)
).values(
PersistentIdentifier.object_uuid
))
query = id_list
if not query:
query = PersistentIdentifier.query.filter_by(
object_type='rec', status=PIDStatus.REGISTERED
).join(
RecordMetadata, PersistentIdentifier.object_uuid == RecordMetadata.id
).filter(
PersistentIdentifier.pid_type.in_(pid_type)
)
if doc_type:
query = query.filter(
RecordMetadata.json.op('->>')('$schema').contains(doc_type)
)
query = (x[0] for x in query.values(PersistentIdentifier.object_uuid))
CernSearchRecordIndexer().bulk_index(query)
click.secho('Execute "run" command to process the queue!',
fg='yellow')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment