From 84fc672b5c82ac9660df1be393d46f78cc5f7274 Mon Sep 17 00:00:00 2001
From: rupozzi <ruben.pozzi@cern.ch>
Date: Mon, 11 Jul 2022 18:17:35 +0200
Subject: [PATCH 1/4] feat: New DB class to monitor application parameters

---
 .../DB/ESApplicationParametersDB.py           | 172 ++++++++++++++++++
 .../Workflow/Modules/BookkeepingReport.py     |  53 +++++-
 2 files changed, 222 insertions(+), 3 deletions(-)
 create mode 100644 src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py
new file mode 100644
index 0000000000..57b17912d0
--- /dev/null
+++ b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py
@@ -0,0 +1,172 @@
+###############################################################################
+# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration           #
+#                                                                             #
+# This software is distributed under the terms of the GNU General Public      #
+# Licence version 3 (GPL Version 3), copied verbatim in the file "LICENSE".   #
+#                                                                             #
+# In applying this licence, CERN does not waive the privileges and immunities #
+# granted to it by virtue of its status as an Intergovernmental Organization  #
+# or submit itself to any jurisdiction.                                       #
+###############################################################################
+""" Module containing a front-end to the ElasticSearch-based ESApplicationSummaryDB.
+"""
+
+from DIRAC import S_OK, S_ERROR, gConfig
+from DIRAC.Core.Utilities import TimeUtilities
+from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
+from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
+from DIRAC.Core.Base.ElasticDB import ElasticDB
+
+name = "ESApplicationSummaryDB"
+
+mapping = {
+    "properties": {
+        "wmsID": {"type": "long"},
+        "ProductionID": {"type": "integer"},
+        "JobID": {"type": "integer"},
+        # TODO
+    }
+}
+
+
+class ESApplicationSummaryDB(ElasticDB):
+    def __init__(self, parentLogger=None):
+        """Standard Constructor"""
+
+        try:
+            section = getDatabaseSection("ProductionManagement/ESApplicationParametersDB")
+            indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", CSGlobals.getSetup()).lower()
+
+            # Connecting to the ES cluster
+            super().__init__(
+                name, "ProductionManagement/ESApplicationParametersDB", indexPrefix, parentLogger=parentLogger
+            )
+        except Exception as ex:
+            self.log.error("Can't connect to ESApplicationParametersDB", repr(ex))
+            raise RuntimeError("Can't connect to ESApplicationParametersDB")
+
+        self.oldIndexName = f"{self.getIndexPrefix()}_{name.lower()}"
+        self.indexName = f"{self.getIndexPrefix()}_ES-jobapplicationparameters_index"
+        # Verifying if the index is there, and if not create it
+        res = self.existingIndex(self.indexName)
+        if not res["OK"] or not res["Value"]:
+            result = self.createIndex(self.indexName, mapping, period=None)
+            if not result["OK"]:
+                self.log.error(result["Message"])
+                raise RuntimeError(result["Message"])
+            self.log.always("Index created:", self.indexName)
+
+        self.dslSearch = self._Search(self.oldIndexName)
+        self.dslSearch.extra(track_total_hits=True)
+
+    def getJobParameters(self, jobID: int, paramList=None) -> dict:
+        """Get Job Parameters defined for jobID.
+        Returns a dictionary with the Job Parameters.
+        If paramList is empty - all the parameters are returned.
+
+
+        :param self: self reference
+        :param jobID: DiracJob ID
+        :param paramList: List or string of parameters to return
+        :return: dict with Job Parameter values
+        """
+        if isinstance(paramList, str):
+            paramList = paramList.replace(" ", "").split(",")
+        self.log.debug(f"Getting Parameters for job {jobID}")
+        resultDict = {}
+        self.log.debug(f"The searched parameters with JobID {jobID} exists in the new index {self.indexName}")
+        res = self.getDoc(self.indexName, str(jobID))
+        if res["OK"]:
+            if paramList:
+                for par in paramList:
+                    try:
+                        resultDict[par] = res["Value"][par]
+                    except Exception as ex:
+                        self.log.error("Could not find the searched parameters")
+            else:
+                # if parameters are not specified return all of them
+                resultDict = res["Value"]
+        return S_OK({jobID: resultDict})
+
+    def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
+        """
+        Inserts data into ESApplicationJobParametersDB index
+
+        :param self: self reference
+        :param jobID: Job ID
+        :param key: parameter key
+        :param value: parameter value
+        :returns: S_OK/S_ERROR as result of indexing
+        """
+        data = {"JobID": jobID, key: value, "timestamp": TimeUtilities.toEpochMilliSeconds()}
+        self.log.debug("Inserting data in {self.indexName}:{data}")
+
+        # If a record with this jobID update and add parameter, otherwise create a new record
+        if self.existsDoc(self.indexName, id=str(jobID)):
+            self.log.debug("A document for this job already exists, it will now be updated")
+            result = self.updateDoc(index=self.indexName, id=str(jobID), body={"doc": data})
+        else:
+            self.log.debug("No document has this job id, creating a new document for this job")
+            result = self.index(self.indexName, body=data, docID=str(jobID))
+        if not result["OK"]:
+            self.log.error("Couldn't insert or update data", result["Message"])
+        return result
+
+    def setJobParameters(self, jobID: int, parameters: list) -> dict:
+        """
+        Inserts data into ESApplicationJobParametersDB index using bulk indexing
+
+        :param self: self reference
+        :param jobID: Job ID
+        :param parameters: list of tuples (name, value) pairs
+        :returns: S_OK/S_ERROR as result of indexing
+        """
+        self.log.debug(f"Inserting parameters", "in {self.indexName}: for job {jobID}: {parameters}")
+        if isinstance(list, parametersDict):
+            parametersDict = dict(parameters)
+
+        parametersDict["JobID"] = jobID
+        parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds())
+
+        if self.existsDoc(self.indexName, id=str(jobID)):
+            self.log.debug("A document for this job already exists, it will now be updated")
+            result = self.updateDoc(index=self.indexName, id=str(jobID), body={"doc": parametersDict})
+        else:
+            self.log.debug("Creating a new document for this job")
+            result = self.index(self.indexName, body=parametersDict, docID=str(jobID))
+        if not result["OK"]:
+            self.log.error("Couldn't insert or update data", result["Message"])
+        return result
+
+    def deleteJobParameters(self, jobID: int, paramList=None) -> dict:
+        """Deletes Job Parameters defined for jobID.
+          Returns a dictionary with the Job Parameters.
+          If paramList is empty - all the parameters for the job are removed
+
+        :param self: self reference
+        :param jobID: Job ID
+        :param paramList: list of parameters to be returned (also a string is treated)
+        :return: S_OK()/S_ERROR()
+        """
+        if isinstance(paramList, str):
+            paramList = paramList.replace(" ", "").split(",")
+        self.log.debug(f"Deleting parameters with JobID {jobID} from the  index {self.indexName}")
+        if not paramList:
+            # Deleting the whole record
+            self.log.debug("Deleting record of job {jobID}")
+            result = self.deleteDoc(self.indexName, id=str(jobID))
+            if not result["OK"]:
+                self.log.error("Could not delete the record")
+                return S_ERROR(result)
+        else:
+            # Deleting the specific parameters
+            self.log.debug(f"Deleting Parameters {paramList} for job {jobID}")
+            for paramName in paramList:
+                result = self.updateDoc(
+                    index=self.indexName, id=str(jobID), body={"script": "ctx._source.remove('" + paramName + "')"}
+                )
+                if not result["OK"]:
+                    self.log.error("Could not delete the prameters")
+                    return S_ERROR(result)
+        self.log.debug("Parameters successfully deleted.")
+        return S_OK()
diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
index 3c8a641023..24d1a9a1c9 100644
--- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
+++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
@@ -24,6 +24,8 @@ from DIRAC import gLogger, S_OK, S_ERROR, gConfig
 from DIRAC.Core.Utilities.Subprocess import systemCall
 from DIRAC.Resources.Catalog.PoolXMLFile import getGUID
 from DIRAC.Workflow.Utilities.Utils import getStepCPUTimes
+from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
+from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
 
 import LHCbDIRAC
 from LHCbDIRAC.Resources.Catalog.PoolXMLFile import getOutputType
@@ -31,6 +33,7 @@ from LHCbDIRAC.Workflow.Modules.ModuleBase import ModuleBase
 from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs
 from LHCbDIRAC.Core.Utilities.XMLSummaries import XMLSummary, XMLSummaryError
 from LHCbDIRAC.Core.Utilities.XMLTreeParser import addChildNode
+from LHCbDIRAC.ProductionManagementSystem.DB.ESApplicationParametersDB import ESApplicationSummaryDB
 
 
 class BookkeepingReport(ModuleBase):
@@ -43,6 +46,8 @@ class BookkeepingReport(ModuleBase):
 
         super(BookkeepingReport, self).__init__(self.log, bkClientIn=bkClient, dm=dm)
 
+        self.cpu = None
+        self.fileSize = None
         self.simDescription = "NoSimConditions"
         self.eventType = ""
         self.poolXMLCatName = ""
@@ -75,7 +80,18 @@ class BookkeepingReport(ModuleBase):
     ):
         """Usual executor."""
         try:
-
+            self.elasticAppParametersDB = None
+            useESForJobParametersFlag = Operations().getValue("/Defaults/useESForJobParametersFlag", False)
+            if useESForJobParametersFlag:
+                try:
+                    result = ObjectLoader().loadObject(
+                        "ProductionManagement.DB.ESApplicationParametersDB", "ESApplicationParametersDB"
+                    )
+                    if not result["OK"]:
+                        return result
+                    self.elasticAppParametersDB = result["Value"]()
+                except RuntimeError as excp:
+                    return S_ERROR("Can't connect to DB: %s" % excp)
             super(BookkeepingReport, self).execute(
                 production_id,
                 prod_job_id,
@@ -94,7 +110,7 @@ class BookkeepingReport(ModuleBase):
             bkLFNs, logFilePath = self._resolveInputVariables()
 
             doc = self.__makeBookkeepingXML(bkLFNs, logFilePath)
-
+            self._sendParamsToES()
             if saveOnFile:
                 bfilename = "bookkeeping_" + self.step_id + ".xml"
                 with open(bfilename, "wb") as bfile:
@@ -313,6 +329,7 @@ class BookkeepingReport(ModuleBase):
 
         typedParams.append(("WNMODEL", nodeInfo["ModelName"]))
         typedParams.append(("WNCPUPOWER", nodeInfo["CPU(MHz)"]))
+        self.cpu = nodeInfo["CPU(MHz)"]
         typedParams.append(("WNCACHE", nodeInfo["CacheSize(kB)"]))
 
         host = os.environ.get("HOSTNAME", os.environ.get("HOST"))
@@ -533,7 +550,7 @@ class BookkeepingReport(ModuleBase):
                     oFile = addChildNode(oFile, "Parameter", 0, ("EventStat", fileStats))
 
             oFile = addChildNode(oFile, "Parameter", 0, ("FileSize", outputsize))
-
+            self.fileSize = outputsize
             oFile = addChildNode(
                 oFile, "Parameter", 0, ("CreationDate", time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())))
             )
@@ -628,3 +645,33 @@ class BookkeepingReport(ModuleBase):
             return S_ERROR("Failed to obtain system information")
 
         return S_OK(result)
+
+    ################################################################################
+
+    def _sendParamsToES(self):
+        """
+        Sends a dict of parameters to ES
+        """
+        params = {}
+        exectime, cputime = getStepCPUTimes(self.step_commons)
+        params["CPUTIME"] = cputime
+        params["ExecTime"] = exectime
+        params["NumberOfProcessors"] = self.numberOfProcessors
+        params["Production"] = self.production_id
+        params["DiracJobID"] = str(self.jobID)
+        params["ProgramName"] = self.applicationName
+        params["ProgramVersion"] = self.applicationVersion
+        if self.cpu:
+            params["WNCPUPOWER"] = self.cpu
+        if self.fileSize:
+            params["FileSize"] = self.fileSize
+        if self.eventType is not None:
+            params["EventType"] = self.eventType
+        else:
+            params["EventType"] = "Unknown"
+        params["SimDescription"] = self.simDescription
+
+        res = self.elasticAppParametersDB.setJobParameters(str(self.jobID), params)
+        if not res["OK"]:
+            return res["Message"]
+        return res
-- 
GitLab


From 883e5f74da7a41593d4203638095e0c10a815859 Mon Sep 17 00:00:00 2001
From: rupozzi <ruben.pozzi@cern.ch>
Date: Mon, 11 Jul 2022 18:47:10 +0200
Subject: [PATCH 2/4] feat: New DB class to monitor application parameters

---
 src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
index 24d1a9a1c9..fb65dca036 100644
--- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
+++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
@@ -81,8 +81,8 @@ class BookkeepingReport(ModuleBase):
         """Usual executor."""
         try:
             self.elasticAppParametersDB = None
-            useESForJobParametersFlag = Operations().getValue("/Defaults/useESForJobParametersFlag", False)
-            if useESForJobParametersFlag:
+            useESForAppParametersFlag = Operations().getValue("/Defaults/useESForAppParametersFlag", False)
+            if useESForAppParametersFlag:
                 try:
                     result = ObjectLoader().loadObject(
                         "ProductionManagement.DB.ESApplicationParametersDB", "ESApplicationParametersDB"
@@ -110,7 +110,7 @@ class BookkeepingReport(ModuleBase):
             bkLFNs, logFilePath = self._resolveInputVariables()
 
             doc = self.__makeBookkeepingXML(bkLFNs, logFilePath)
-            self._sendParamsToES()
+
             if saveOnFile:
                 bfilename = "bookkeeping_" + self.step_id + ".xml"
                 with open(bfilename, "wb") as bfile:
@@ -118,6 +118,9 @@ class BookkeepingReport(ModuleBase):
             else:
                 print(doc)
 
+            if useESForAppParametersFlag:
+                self._sendParamsToES()
+
             return S_OK()
 
         except Exception as e:  # pylint:disable=broad-except
-- 
GitLab


From 1b1bf4eaa6325ffdf4507fa6617f5b8b5804cccb Mon Sep 17 00:00:00 2001
From: rupozzi <ruben.pozzi@cern.ch>
Date: Tue, 12 Jul 2022 12:18:20 +0200
Subject: [PATCH 3/4] fix: Syntax error in method

---
 .../ProductionManagementSystem/DB/ESApplicationParametersDB.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py
index 57b17912d0..d19c9631ad 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py
@@ -121,8 +121,9 @@ class ESApplicationSummaryDB(ElasticDB):
         :param parameters: list of tuples (name, value) pairs
         :returns: S_OK/S_ERROR as result of indexing
         """
+        parametersDict = {}
         self.log.debug(f"Inserting parameters", "in {self.indexName}: for job {jobID}: {parameters}")
-        if isinstance(list, parametersDict):
+        if isinstance(list, parameters):
             parametersDict = dict(parameters)
 
         parametersDict["JobID"] = jobID
-- 
GitLab


From ca6dfa33d12767fb48a8afc8098fefd847ce54e9 Mon Sep 17 00:00:00 2001
From: rupozzi <ruben.pozzi@cern.ch>
Date: Tue, 12 Jul 2022 16:20:26 +0200
Subject: [PATCH 4/4] fix: wrong sending to ES

---
 .../Service/BookkeepingManagerHandler.py      | 120 +++++++++---------
 .../XMLReader/XMLFilesReaderManager.py        |   2 +
 2 files changed, 62 insertions(+), 60 deletions(-)

diff --git a/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py b/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py
index 4a51066cec..af04e0e44a 100644
--- a/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py
+++ b/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py
@@ -9,6 +9,7 @@
 # or submit itself to any jurisdiction.                                       #
 ###############################################################################
 """BookkeepingManager service is the front-end to the Bookkeeping database."""
+import six
 
 from DIRAC import gLogger, S_OK, S_ERROR
 from DIRAC.Core.DISET.RequestHandler import RequestHandler
@@ -56,7 +57,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_OK()
 
     #############################################################################
-    types_sendXMLBookkeepingReport = [str]
+    types_sendXMLBookkeepingReport = [six.string_types]
 
     def export_sendXMLBookkeepingReport(self, xml):
         """This method is used to upload an xml report which is produced after when
@@ -66,6 +67,7 @@ class BookkeepingManagerHandler(RequestHandler):
 
         :param str xml: bookkeeping report
         """
+        # self.elasticAppParametersDB.setJobParameters(str(self.jobID), params)
         retVal = self.xmlReader.readXMLfromString(xml)
         if not retVal["OK"]:
             self.log.error("Issue reading XML", retVal["Message"])
@@ -156,7 +158,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getAvailableFileTypes()
 
     #############################################################################
-    types_insertFileTypes = [str, str, str]
+    types_insertFileTypes = [six.string_types, six.string_types, six.string_types]
 
     @classmethod
     def export_insertFileTypes(cls, ftype, desc, fileType):
@@ -358,7 +360,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return result
 
     #############################################################################
-    types_getProcessingPass = [dict, str]
+    types_getProcessingPass = [dict, six.string_types]
 
     @classmethod
     def export_getProcessingPass(cls, in_dict, path=None):
@@ -510,7 +512,7 @@ class BookkeepingManagerHandler(RequestHandler):
             gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality")
 
         if "RunNumbers" in in_dict:
-            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber")
+            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers")
 
         result = []
         retVal = cls.bkkDB.getFiles(
@@ -858,7 +860,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getMoreProductionInformations(prodid)
 
     #############################################################################
-    types_getJobInfo = [str]
+    types_getJobInfo = [six.string_types]
 
     @classmethod
     def export_getJobInfo(cls, lfn):
@@ -884,7 +886,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getJobInformation(in_dict)
 
     #############################################################################
-    types_getRunNumber = [str]
+    types_getRunNumber = [six.string_types]
 
     @classmethod
     def export_getRunNumber(cls, lfn):
@@ -892,7 +894,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getRunNumber(lfn)
 
     #############################################################################
-    types_getRunNbAndTck = [str]
+    types_getRunNbAndTck = [six.string_types]
 
     @classmethod
     def export_getRunNbAndTck(cls, lfn):
@@ -900,7 +902,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getRunNbAndTck(lfn)
 
     #############################################################################
-    types_getProductionFiles = [int, str]
+    types_getProductionFiles = [six.integer_types, six.string_types]
 
     @classmethod
     def export_getProductionFiles(cls, prod, fileType, replica=default):
@@ -909,7 +911,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionFiles(prod, fileType, replica)
 
     #############################################################################
-    types_getProductionFilesBulk = [list, str]
+    types_getProductionFilesBulk = [list, six.string_types]
 
     @classmethod
     def export_getProductionFilesBulk(cls, prods, fileType, replica=default):
@@ -933,7 +935,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getRunFiles(runid)
 
     #############################################################################
-    types_updateFileMetaData = [str, dict]
+    types_updateFileMetaData = [six.string_types, dict]
 
     @classmethod
     def export_updateFileMetaData(cls, filename, fileAttr):
@@ -945,7 +947,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.updateFileMetaData(filename, fileAttr)
 
     #############################################################################
-    types_renameFile = [str, str]
+    types_renameFile = [six.string_types, six.string_types]
 
     @classmethod
     def export_renameFile(cls, oldLFN, newLFN):
@@ -954,7 +956,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.renameFile(oldLFN, newLFN)
 
     #############################################################################
-    types_getProductionProcessingPassID = [int]
+    types_getProductionProcessingPassID = [six.integer_types]
 
     @classmethod
     def export_getProductionProcessingPassID(cls, prodid):
@@ -962,7 +964,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionProcessingPassID(prodid)
 
     #############################################################################
-    types_getProductionProcessingPass = [int]
+    types_getProductionProcessingPass = [six.integer_types]
 
     @classmethod
     def export_getProductionProcessingPass(cls, prodid):
@@ -996,7 +998,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_OK({"Successfull": successfull, "Faild": faild})
 
     #############################################################################
-    types_setFileDataQuality = [list, str]
+    types_setFileDataQuality = [list, six.string_types]
 
     @classmethod
     def export_setFileDataQuality(cls, lfns, flag):
@@ -1004,7 +1006,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.setFileDataQuality(lfns, flag)
 
     #############################################################################
-    types_setRunAndProcessingPassDataQuality = [(int, str), str, str]
+    types_setRunAndProcessingPassDataQuality = [six.integer_types, six.string_types, six.string_types]
 
     @classmethod
     def export_setRunAndProcessingPassDataQuality(cls, runNB, procpass, flag):
@@ -1015,12 +1017,10 @@ class BookkeepingManagerHandler(RequestHandler):
         used to set the data quality flag to a given run files which
         processed by a given processing pass.
         """
-        if isinstance(runNB, str):
-            runNB = int(runNB)
         return cls.bkkDB.setRunAndProcessingPassDataQuality(runNB, procpass, flag)
 
     #############################################################################
-    types_setRunDataQuality = [int, str]
+    types_setRunDataQuality = [int, six.string_types]
 
     @classmethod
     def export_setRunDataQuality(cls, runNb, flag):
@@ -1031,7 +1031,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.setRunDataQuality(runNb, flag)
 
     #############################################################################
-    types_setQualityRun = [int, str]
+    types_setQualityRun = [int, six.string_types]
 
     @classmethod
     def export_setQualityRun(cls, runNb, flag):
@@ -1039,7 +1039,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.setRunDataQuality(runNb, flag)
 
     #############################################################################
-    types_setProductionDataQuality = [int, str]
+    types_setProductionDataQuality = [int, six.string_types]
 
     @classmethod
     def export_setProductionDataQuality(cls, prod, flag):
@@ -1121,7 +1121,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return self.export_getFileDescendents(lfn, depth, production, checkreplica)
 
     #############################################################################
-    types_checkfile = [str]
+    types_checkfile = [six.string_types]
 
     @classmethod
     def export_checkfile(cls, fileName):
@@ -1129,7 +1129,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.checkfile(fileName)
 
     #############################################################################
-    types_checkFileTypeAndVersion = [str, str]
+    types_checkFileTypeAndVersion = [six.string_types, six.string_types]
 
     @classmethod
     def export_checkFileTypeAndVersion(cls, ftype, version):
@@ -1137,7 +1137,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.checkFileTypeAndVersion(ftype, version)
 
     #############################################################################
-    types_checkEventType = [int]
+    types_checkEventType = [six.integer_types]
 
     @classmethod
     def export_checkEventType(cls, eventTypeId):
@@ -1169,7 +1169,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getSimConditions()
 
     #############################################################################
-    types_removeReplica = [str]
+    types_removeReplica = [six.string_types]
 
     @classmethod
     def export_removeReplica(cls, fileName):
@@ -1209,7 +1209,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getFileMetaDataForWeb(lfns)
 
     #############################################################################
-    types_getProductionFilesForUsers = [int, dict, dict, int, int]
+    types_getProductionFilesForUsers = [int, dict, dict, six.integer_types, six.integer_types]
 
     @classmethod
     def export_getProductionFilesForUsers(cls, prod, ftype, sortDict, startItem, maxitems):
@@ -1217,7 +1217,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionFilesForWeb(prod, ftype, sortDict, startItem, maxitems)
 
     #############################################################################
-    types_getProductionFilesForWeb = [int, dict, dict, int, int]
+    types_getProductionFilesForWeb = [six.integer_types, dict, dict, six.integer_types, six.integer_types]
 
     @classmethod
     def export_getProductionFilesWeb(cls, prod, ftype, sortDict, startItem, maxitems):
@@ -1242,7 +1242,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.addReplica(fileName)
 
     #############################################################################
-    types_getRunInformations = [int]
+    types_getRunInformations = [six.integer_types]
 
     @classmethod
     def export_getRunInformations(cls, runnb):
@@ -1258,7 +1258,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getRunInformation(runnb)
 
     #############################################################################
-    types_getFileCreationLog = [str]
+    types_getFileCreationLog = [six.string_types]
 
     @classmethod
     def export_getFileCreationLog(cls, lfn):
@@ -1266,7 +1266,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getFileCreationLog(lfn)
 
     #############################################################################
-    types_getLogfile = [str]
+    types_getLogfile = [six.string_types]
 
     @classmethod
     def export_getLogfile(cls, lfn):
@@ -1274,7 +1274,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getFileCreationLog(lfn)
 
     #############################################################################
-    types_insertEventType = [int, str, str]
+    types_insertEventType = [six.integer_types, six.string_types, six.string_types]
 
     @classmethod
     def export_insertEventType(cls, evid, desc, primary):
@@ -1288,14 +1288,14 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_OK(str(evid) + " event type exists")
 
     #############################################################################
-    types_addEventType = [int, str, str]
+    types_addEventType = [six.integer_types, six.string_types, six.string_types]
 
     def export_addEventType(self, evid, desc, primary):
         """more info in the BookkeepingClient.py."""
         return self.export_insertEventType(evid, desc, primary)
 
     #############################################################################
-    types_updateEventType = [int, str, str]
+    types_updateEventType = [six.integer_types, six.string_types, six.string_types]
 
     @classmethod
     def export_updateEventType(cls, evid, desc, primary):
@@ -1343,7 +1343,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionSummary(cName, cVersion, simdesc, pgroup, production, ftype, evttype)
 
     #############################################################################
-    types_getProductionInformation = [int]
+    types_getProductionInformation = [six.integer_types]
 
     def export_getProductionInformation(self, prodid):
         """It returns statistics (data processing phases, number of events, etc.) for a given production"""
@@ -1405,7 +1405,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_OK(result)
 
     #############################################################################
-    types_getFileHistory = [str]
+    types_getFileHistory = [six.string_types]
 
     @classmethod
     def export_getFileHistory(cls, lfn):
@@ -1461,7 +1461,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_OK(result)
 
     #############################################################################
-    types_getJobsNb = [int]
+    types_getJobsNb = [six.integer_types]
 
     @classmethod
     @deprecated("Use getProductionNbOfJobs")
@@ -1470,7 +1470,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfJobs(prodid)
 
     #############################################################################
-    types_getProductionNbOfJobs = [int]
+    types_getProductionNbOfJobs = [six.integer_types]
 
     @classmethod
     def export_getProductionNbOfJobs(cls, prodid):
@@ -1478,7 +1478,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfJobs(prodid)
 
     #############################################################################
-    types_getNumberOfEvents = [int]
+    types_getNumberOfEvents = [six.integer_types]
 
     @classmethod
     @deprecated("Use getProductionNbOfEvents")
@@ -1487,7 +1487,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfEvents(prodid)
 
     #############################################################################
-    types_getProductionNbOfEvents = [int]
+    types_getProductionNbOfEvents = [six.integer_types]
 
     @classmethod
     def export_getProductionNbOfEvents(cls, prodid):
@@ -1495,7 +1495,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfEvents(prodid)
 
     #############################################################################
-    types_getSizeOfFiles = [int]
+    types_getSizeOfFiles = [six.integer_types]
 
     @classmethod
     @deprecated("Use getProductionSizeOfFiles")
@@ -1504,7 +1504,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionSizeOfFiles(prodid)
 
     #############################################################################
-    types_getProductionSizeOfFiles = [int]
+    types_getProductionSizeOfFiles = [six.integer_types]
 
     @classmethod
     def export_getProductionSizeOfFiles(cls, prodid):
@@ -1512,7 +1512,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionSizeOfFiles(prodid)
 
     #############################################################################
-    types_getNbOfFiles = [int]
+    types_getNbOfFiles = [six.integer_types]
 
     @classmethod
     @deprecated("Use getProductionNbOfFiles")
@@ -1521,7 +1521,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfFiles(prodid)
 
     #############################################################################
-    types_getProductionNbOfFiles = [int]
+    types_getProductionNbOfFiles = [six.integer_types]
 
     @classmethod
     def export_getProductionNbOfFiles(cls, prodid):
@@ -1529,7 +1529,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionNbOfFiles(prodid)
 
     #############################################################################
-    types_getNbOfJobsBySites = [int]
+    types_getNbOfJobsBySites = [six.integer_types]
 
     @classmethod
     def export_getNbOfJobsBySites(cls, prodid):
@@ -1546,7 +1546,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getAvailableTags()
 
     #############################################################################
-    types_getProcessedEvents = [int]
+    types_getProcessedEvents = [six.integer_types]
 
     @classmethod
     @deprecated("Use getProductionProcessedEvents")
@@ -1555,7 +1555,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getProductionProcessedEvents(prodid)
 
     #############################################################################
-    types_getProductionProcessedEvents = [int]
+    types_getProductionProcessedEvents = [six.integer_types]
 
     @classmethod
     def export_getProductionProcessedEvents(cls, prodid):
@@ -1617,7 +1617,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.setFilesVisible(lfns)
 
     #############################################################################
-    types_getRunAndProcessingPassDataQuality = [int, int]
+    types_getRunAndProcessingPassDataQuality = [six.integer_types, six.integer_types]
 
     @classmethod
     def export_getRunAndProcessingPassDataQuality(cls, runnb, processing):
@@ -1633,7 +1633,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getAvailableConfigurations()
 
     #############################################################################
-    types_getRunProcessingPass = [int]
+    types_getRunProcessingPass = [six.integer_types]
 
     @classmethod
     def export_getRunProcessingPass(cls, runnumber):
@@ -1689,7 +1689,7 @@ class BookkeepingManagerHandler(RequestHandler):
             gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality")
 
         if "RunNumbers" in values:
-            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber")
+            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers")
 
         result = []
         retVal = cls.bkkDB.getFiles(
@@ -1766,7 +1766,7 @@ class BookkeepingManagerHandler(RequestHandler):
             gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality")
 
         if "RunNumbers" in in_dict:
-            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber")
+            gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers")
 
         gLogger.debug("getVisibleFilesWithMetadata->", "%s" % in_dict)
         result = {}
@@ -1983,7 +1983,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_ERROR("The Production dictionary key is missing!!!")
 
     #############################################################################
-    types_getRunQuality = [str, str]
+    types_getRunQuality = [six.string_types, six.string_types]
 
     @deprecated("Use getRunWithProcessingPassAndDataQuality")
     def export_getRunQuality(self, procpass, flag=default):
@@ -1992,7 +1992,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return self.export_getRunWithProcessingPassAndDataQuality(procpass, flag)
 
     #############################################################################
-    types_getRunWithProcessingPassAndDataQuality = [str, str]
+    types_getRunWithProcessingPassAndDataQuality = [six.string_types, six.string_types]
 
     @classmethod
     def export_getRunWithProcessingPassAndDataQuality(cls, procpass, flag=default):
@@ -2034,7 +2034,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return S_ERROR("The run number has to be specified!")
 
     #############################################################################
-    types_getProcessingPassId = [str]
+    types_getProcessingPassId = [six.string_types]
 
     @classmethod
     def export_getProcessingPassId(cls, fullpath):
@@ -2125,7 +2125,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return self.export_getTCKs(in_dict)
 
     #############################################################################
-    types_getSteps = [int]
+    types_getSteps = [six.integer_types]
 
     @classmethod
     def export_getSteps(cls, prodID):
@@ -2176,7 +2176,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getDirectoryMetadata(lfn)
 
     #############################################################################
-    types_getFilesForGUID = [str]
+    types_getFilesForGUID = [six.string_types]
 
     @classmethod
     def export_getFilesForGUID(cls, guid):
@@ -2205,7 +2205,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.getListOfFills(configName, configVersion, conddescription)
 
     #############################################################################
-    types_getRunsForFill = [int]
+    types_getRunsForFill = [six.integer_types]
 
     @classmethod
     def export_getRunsForFill(cls, fillid):
@@ -2252,7 +2252,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.updateSimulationConditions(in_dict)
 
     #############################################################################
-    types_deleteSimulationConditions = [int]
+    types_deleteSimulationConditions = [six.integer_types]
 
     @classmethod
     def export_deleteSimulationConditions(cls, simid):
@@ -2274,14 +2274,14 @@ class BookkeepingManagerHandler(RequestHandler):
         """It returns the input and output files for a given DIRAC jobid."""
         return cls.bkkDB.getJobInputOutputFiles(diracjobids)
 
-    types_setRunOnlineFinished = [int]
+    types_setRunOnlineFinished = [six.integer_types]
 
     @classmethod
     def export_setRunOnlineFinished(cls, runnumber):
         """It is used to set the run finished..."""
         return cls.bkkDB.setRunStatusFinished(runnumber, "Y")
 
-    types_setRunOnlineNotFinished = [int]
+    types_setRunOnlineNotFinished = [six.integer_types]
 
     @classmethod
     def export_setRunOnlineNotFinished(cls, runnumber):
@@ -2309,7 +2309,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.fixRunLuminosity(runnumbers)
 
     #############################################################################
-    types_getProductionProducedEvents = [int]
+    types_getProductionProducedEvents = [six.integer_types]
 
     @classmethod
     def export_getProductionProducedEvents(cls, prodid):
@@ -2357,7 +2357,7 @@ class BookkeepingManagerHandler(RequestHandler):
         return cls.bkkDB.bulkupdateEventType(eventtypes)
 
     #############################################################################
-    types_getRunConfigurationsAndDataTakingCondition = [int]
+    types_getRunConfigurationsAndDataTakingCondition = [six.integer_types]
 
     @classmethod
     def export_getRunConfigurationsAndDataTakingCondition(cls, runnumber):
diff --git a/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py b/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py
index bda5149dea..9e6e029738 100644
--- a/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py
+++ b/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py
@@ -79,6 +79,8 @@ class XMLFilesReaderManager(object):
         """interprets the xml content."""
         self.log.debug("Start Job Processing")
 
+        sendToElasticSearch(job)
+
         # prepare for the insert, check the existence of the input files and retreive the fileid
         inputFiles = [inputFile.name for inputFile in job.inputFiles]
         if inputFiles:
-- 
GitLab