indexer.py 5.7 KB
Newer Older
1
2
#!/usr/bin/python
# -*- coding: utf-8 -*-
3
4
#
# This file is part of CERN Search.
Carina Antunes's avatar
Carina Antunes committed
5
# Copyright (C) 2018-2021 CERN.
6
#
7
# Citadel Search is free software; you can redistribute it and/or modify it
8
# under the terms of the MIT License; see LICENSE file for more details.
9
"""Indexer utilities."""
10
import json as json_lib
Carina Antunes's avatar
Carina Antunes committed
11
from json import JSONDecodeError
12

13
from flask import current_app
14
from invenio_files_rest.storage import FileStorage
15
from invenio_indexer.api import RecordIndexer
16

Carina Antunes's avatar
Carina Antunes committed
17
18
from cern_search_rest_api.modules.cernsearch.api import CernSearchRecord
from cern_search_rest_api.modules.cernsearch.file_meta import extract_metadata_from_processor
Carina Antunes's avatar
Carina Antunes committed
19
from cern_search_rest_api.modules.cernsearch.tasks import process_file_async
Carina Antunes's avatar
Carina Antunes committed
20
21
22
23
24
25
26
27
28
29
30
31
32

READ_MODE_BINARY = "rb"
READ_WRITE_MODE_BINARY = "rb+"

CONTENT_KEY = "content"
FILE_KEY = "file"
FILE_FORMAT_KEY = "file_extension"
DATA_KEY = "_data"
AUTHORS_KEY = "authors"
COLLECTION_KEY = "collection"
NAME_KEY = "name"
KEYWORDS_KEY = "keywords"
CREATION_KEY = "creation_date"
Carina Antunes's avatar
Carina Antunes committed
33
# Hard limit on content on 1MB due to ES limitations
Carina Antunes's avatar
Carina Antunes committed
34
# Ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.1/general-recommendations.html#maximum-document-size
Carina Antunes's avatar
Carina Antunes committed
35
36
CONTENT_HARD_LIMIT = int(1 * 1024 * 1024)

37

38
39
40
41
42
class CernSearchRecordIndexer(RecordIndexer):
    """Record Indexer."""

    record_cls = CernSearchRecord

Carina Antunes's avatar
Carina Antunes committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
    #
    # Add ensure connection
    #
    def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
        """Index record in Elasticsearch asynchronously.

        :param record_id_iterator: Iterator that yields record UUIDs.
        :param op_type: Indexing operation (one of ``index``, ``create``,
            ``delete`` or ``update``).
        :param index: The Elasticsearch index. (Default: ``None``)
        :param doc_type: The Elasticsearch doc_type. (Default: ``None``)
        """

        def errback(exc, interval):
            current_app.logging.exception(exc)
            current_app.logging.info("Retry in %s seconds.", interval)

        with self.create_producer() as producer:
            producer.connection.ensure_connection(errback=errback, max_retries=3, timeout=5)

            for rec in record_id_iterator:
                current_app.logger.debug(rec)
                producer.publish(
                    dict(
                        id=str(rec),
                        op=op_type,
                        index=index,
                        doc_type=doc_type,
                    ),
                )

74

Carina Antunes's avatar
Carina Antunes committed
75
76
77
78
79
80
81
82
83
def index_file_content(
    sender,
    json=None,
    record: CernSearchRecord = None,
    index=None,
    doc_type=None,
    arguments=None,
    **kwargs,
):
84
    """Index file content in search."""
Carina Antunes's avatar
Carina Antunes committed
85
86
87
    if not record.files and not record.files_content:
        return

Carina Antunes's avatar
Carina Antunes committed
88
    # Reindex in case file_processor job is lost
89
    if not record.files_content:
Carina Antunes's avatar
Carina Antunes committed
90
91
        file_obj = next(iter(record.files))
        current_app.logger.warning("No file content, retrying file: %s in %s", file_obj.obj.basename, record.id)
Carina Antunes's avatar
Carina Antunes committed
92
        process_file_async.delay(str(file_obj.obj.bucket_id), file_obj.obj.key)
93
94
        return

Carina Antunes's avatar
Carina Antunes committed
95
96
97
98
99
    # Index first or none
    file_obj = next(iter(record.files_content))
    if not file_obj.obj.file.readable:
        current_app.logger.warning("Could not index file not readable: %s in %s", file_obj.obj.basename, record.id)
        return
Carina Antunes's avatar
Carina Antunes committed
100

Carina Antunes's avatar
Carina Antunes committed
101
    current_app.logger.debug("Index file content: %s in %s", file_obj.obj.basename, record.id)
102

Carina Antunes's avatar
Carina Antunes committed
103
    json[FILE_KEY] = file_obj.obj.basename
Carina Antunes's avatar
Carina Antunes committed
104

Carina Antunes's avatar
Carina Antunes committed
105
106
107
108
109
110
111
    storage = file_obj.obj.file.storage()  # type: FileStorage
    with storage.open(mode=READ_WRITE_MODE_BINARY) as fp:
        try:
            file_content = json_lib.load(fp)
        except JSONDecodeError:
            current_app.logger.error("File content contains invalid json: %s in %s", file_obj.obj.basename, record.id)
            return
112

Carina Antunes's avatar
Carina Antunes committed
113
        check_file_content_limit(file_content, file_obj.obj.basename, record.id)
114

Carina Antunes's avatar
Carina Antunes committed
115
        json[DATA_KEY][CONTENT_KEY] = file_content["content"]
116

Carina Antunes's avatar
Carina Antunes committed
117
118
        if current_app.config.get("PROCESS_FILE_META"):
            index_metadata(file_content, json, file_obj.obj.basename)
119
120


Carina Antunes's avatar
Carina Antunes committed
121
122
def index_metadata(file_content, json, file_name):
    """Extract metadata from file to be indexed."""
Carina Antunes's avatar
Carina Antunes committed
123
124
125
126
    metadata = extract_metadata_from_processor(file_content.get("metadata"))

    index_specific_meta = isinstance(current_app.config.get("PROCESS_FILE_META"), list)
    indexable_meta = current_app.config.get("PROCESS_FILE_META")
127

Carina Antunes's avatar
Carina Antunes committed
128
129
130
131
    def should_index(field):
        return not index_specific_meta or (index_specific_meta and field in indexable_meta)

    if metadata.get("authors") and should_index(AUTHORS_KEY):
Carina Antunes's avatar
Carina Antunes committed
132
        json[DATA_KEY][AUTHORS_KEY] = metadata.get("authors")
Carina Antunes's avatar
Carina Antunes committed
133
134

    if metadata.get("content_type") and should_index(COLLECTION_KEY):
Carina Antunes's avatar
Carina Antunes committed
135
        json[COLLECTION_KEY] = metadata["content_type"]
Carina Antunes's avatar
Carina Antunes committed
136
137

    if metadata.get("title") and should_index(NAME_KEY):
Carina Antunes's avatar
Carina Antunes committed
138
        json[DATA_KEY][NAME_KEY] = metadata["title"]
Carina Antunes's avatar
Carina Antunes committed
139
140

    if metadata.get("keywords") and should_index(KEYWORDS_KEY):
Carina Antunes's avatar
Carina Antunes committed
141
        json[DATA_KEY][KEYWORDS_KEY] = metadata["keywords"]
Carina Antunes's avatar
Carina Antunes committed
142
143

    if metadata.get("creation_date") and should_index(CREATION_KEY):
Carina Antunes's avatar
Carina Antunes committed
144
        json[CREATION_KEY] = metadata["creation_date"]
145

Carina Antunes's avatar
Carina Antunes committed
146
    if "." in file_name and should_index(FILE_FORMAT_KEY):
Carina Antunes's avatar
Carina Antunes committed
147
        json[FILE_FORMAT_KEY] = file_name.split(".")[-1]
148
149


Carina Antunes's avatar
Carina Antunes committed
150
151
def check_file_content_limit(file_content, file_name, record_id):
    """Check file content limit and truncate if necessary."""
Carina Antunes's avatar
Carina Antunes committed
152
153
    if "content" not in file_content:
        current_app.logger.warning("No file content: %s in %s", file_name, record_id)
Carina Antunes's avatar
Carina Antunes committed
154

Carina Antunes's avatar
Carina Antunes committed
155
    file_content["content"] = file_content.get("content", "")
Carina Antunes's avatar
Carina Antunes committed
156
    if len(str(file_content["content"])) > CONTENT_HARD_LIMIT:
Carina Antunes's avatar
Carina Antunes committed
157
        current_app.logger.warning("Truncated file content: %s in %s", file_name, record_id)
Carina Antunes's avatar
Carina Antunes committed
158
        file_content["content"] = str(file_content["content"])[:CONTENT_HARD_LIMIT]