From 84fc672b5c82ac9660df1be393d46f78cc5f7274 Mon Sep 17 00:00:00 2001 From: rupozzi <ruben.pozzi@cern.ch> Date: Mon, 11 Jul 2022 18:17:35 +0200 Subject: [PATCH 1/4] feat: New DB class to monitor application parameters --- .../DB/ESApplicationParametersDB.py | 172 ++++++++++++++++++ .../Workflow/Modules/BookkeepingReport.py | 53 +++++- 2 files changed, 222 insertions(+), 3 deletions(-) create mode 100644 src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py diff --git a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py new file mode 100644 index 0000000000..57b17912d0 --- /dev/null +++ b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py @@ -0,0 +1,172 @@ +############################################################################### +# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration # +# # +# This software is distributed under the terms of the GNU General Public # +# Licence version 3 (GPL Version 3), copied verbatim in the file "LICENSE". # +# # +# In applying this licence, CERN does not waive the privileges and immunities # +# granted to it by virtue of its status as an Intergovernmental Organization # +# or submit itself to any jurisdiction. # +############################################################################### +""" Module containing a front-end to the ElasticSearch-based ESApplicationSummaryDB. +""" + +from DIRAC import S_OK, S_ERROR, gConfig +from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection +from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals +from DIRAC.Core.Base.ElasticDB import ElasticDB + +name = "ESApplicationSummaryDB" + +mapping = { + "properties": { + "wmsID": {"type": "long"}, + "ProductionID": {"type": "integer"}, + "JobID": {"type": "integer"}, + # TODO + } +} + + +class ESApplicationSummaryDB(ElasticDB): + def __init__(self, parentLogger=None): + """Standard Constructor""" + + try: + section = getDatabaseSection("ProductionManagement/ESApplicationParametersDB") + indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", CSGlobals.getSetup()).lower() + + # Connecting to the ES cluster + super().__init__( + name, "ProductionManagement/ESApplicationParametersDB", indexPrefix, parentLogger=parentLogger + ) + except Exception as ex: + self.log.error("Can't connect to ESApplicationParametersDB", repr(ex)) + raise RuntimeError("Can't connect to ESApplicationParametersDB") + + self.oldIndexName = f"{self.getIndexPrefix()}_{name.lower()}" + self.indexName = f"{self.getIndexPrefix()}_ES-jobapplicationparameters_index" + # Verifying if the index is there, and if not create it + res = self.existingIndex(self.indexName) + if not res["OK"] or not res["Value"]: + result = self.createIndex(self.indexName, mapping, period=None) + if not result["OK"]: + self.log.error(result["Message"]) + raise RuntimeError(result["Message"]) + self.log.always("Index created:", self.indexName) + + self.dslSearch = self._Search(self.oldIndexName) + self.dslSearch.extra(track_total_hits=True) + + def getJobParameters(self, jobID: int, paramList=None) -> dict: + """Get Job Parameters defined for jobID. + Returns a dictionary with the Job Parameters. + If paramList is empty - all the parameters are returned. + + + :param self: self reference + :param jobID: DiracJob ID + :param paramList: List or string of parameters to return + :return: dict with Job Parameter values + """ + if isinstance(paramList, str): + paramList = paramList.replace(" ", "").split(",") + self.log.debug(f"Getting Parameters for job {jobID}") + resultDict = {} + self.log.debug(f"The searched parameters with JobID {jobID} exists in the new index {self.indexName}") + res = self.getDoc(self.indexName, str(jobID)) + if res["OK"]: + if paramList: + for par in paramList: + try: + resultDict[par] = res["Value"][par] + except Exception as ex: + self.log.error("Could not find the searched parameters") + else: + # if parameters are not specified return all of them + resultDict = res["Value"] + return S_OK({jobID: resultDict}) + + def setJobParameter(self, jobID: int, key: str, value: str) -> dict: + """ + Inserts data into ESApplicationJobParametersDB index + + :param self: self reference + :param jobID: Job ID + :param key: parameter key + :param value: parameter value + :returns: S_OK/S_ERROR as result of indexing + """ + data = {"JobID": jobID, key: value, "timestamp": TimeUtilities.toEpochMilliSeconds()} + self.log.debug("Inserting data in {self.indexName}:{data}") + + # If a record with this jobID update and add parameter, otherwise create a new record + if self.existsDoc(self.indexName, id=str(jobID)): + self.log.debug("A document for this job already exists, it will now be updated") + result = self.updateDoc(index=self.indexName, id=str(jobID), body={"doc": data}) + else: + self.log.debug("No document has this job id, creating a new document for this job") + result = self.index(self.indexName, body=data, docID=str(jobID)) + if not result["OK"]: + self.log.error("Couldn't insert or update data", result["Message"]) + return result + + def setJobParameters(self, jobID: int, parameters: list) -> dict: + """ + Inserts data into ESApplicationJobParametersDB index using bulk indexing + + :param self: self reference + :param jobID: Job ID + :param parameters: list of tuples (name, value) pairs + :returns: S_OK/S_ERROR as result of indexing + """ + self.log.debug(f"Inserting parameters", "in {self.indexName}: for job {jobID}: {parameters}") + if isinstance(list, parametersDict): + parametersDict = dict(parameters) + + parametersDict["JobID"] = jobID + parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds()) + + if self.existsDoc(self.indexName, id=str(jobID)): + self.log.debug("A document for this job already exists, it will now be updated") + result = self.updateDoc(index=self.indexName, id=str(jobID), body={"doc": parametersDict}) + else: + self.log.debug("Creating a new document for this job") + result = self.index(self.indexName, body=parametersDict, docID=str(jobID)) + if not result["OK"]: + self.log.error("Couldn't insert or update data", result["Message"]) + return result + + def deleteJobParameters(self, jobID: int, paramList=None) -> dict: + """Deletes Job Parameters defined for jobID. + Returns a dictionary with the Job Parameters. + If paramList is empty - all the parameters for the job are removed + + :param self: self reference + :param jobID: Job ID + :param paramList: list of parameters to be returned (also a string is treated) + :return: S_OK()/S_ERROR() + """ + if isinstance(paramList, str): + paramList = paramList.replace(" ", "").split(",") + self.log.debug(f"Deleting parameters with JobID {jobID} from the index {self.indexName}") + if not paramList: + # Deleting the whole record + self.log.debug("Deleting record of job {jobID}") + result = self.deleteDoc(self.indexName, id=str(jobID)) + if not result["OK"]: + self.log.error("Could not delete the record") + return S_ERROR(result) + else: + # Deleting the specific parameters + self.log.debug(f"Deleting Parameters {paramList} for job {jobID}") + for paramName in paramList: + result = self.updateDoc( + index=self.indexName, id=str(jobID), body={"script": "ctx._source.remove('" + paramName + "')"} + ) + if not result["OK"]: + self.log.error("Could not delete the prameters") + return S_ERROR(result) + self.log.debug("Parameters successfully deleted.") + return S_OK() diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py index 3c8a641023..24d1a9a1c9 100644 --- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py +++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py @@ -24,6 +24,8 @@ from DIRAC import gLogger, S_OK, S_ERROR, gConfig from DIRAC.Core.Utilities.Subprocess import systemCall from DIRAC.Resources.Catalog.PoolXMLFile import getGUID from DIRAC.Workflow.Utilities.Utils import getStepCPUTimes +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations import LHCbDIRAC from LHCbDIRAC.Resources.Catalog.PoolXMLFile import getOutputType @@ -31,6 +33,7 @@ from LHCbDIRAC.Workflow.Modules.ModuleBase import ModuleBase from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs from LHCbDIRAC.Core.Utilities.XMLSummaries import XMLSummary, XMLSummaryError from LHCbDIRAC.Core.Utilities.XMLTreeParser import addChildNode +from LHCbDIRAC.ProductionManagementSystem.DB.ESApplicationParametersDB import ESApplicationSummaryDB class BookkeepingReport(ModuleBase): @@ -43,6 +46,8 @@ class BookkeepingReport(ModuleBase): super(BookkeepingReport, self).__init__(self.log, bkClientIn=bkClient, dm=dm) + self.cpu = None + self.fileSize = None self.simDescription = "NoSimConditions" self.eventType = "" self.poolXMLCatName = "" @@ -75,7 +80,18 @@ class BookkeepingReport(ModuleBase): ): """Usual executor.""" try: - + self.elasticAppParametersDB = None + useESForJobParametersFlag = Operations().getValue("/Defaults/useESForJobParametersFlag", False) + if useESForJobParametersFlag: + try: + result = ObjectLoader().loadObject( + "ProductionManagement.DB.ESApplicationParametersDB", "ESApplicationParametersDB" + ) + if not result["OK"]: + return result + self.elasticAppParametersDB = result["Value"]() + except RuntimeError as excp: + return S_ERROR("Can't connect to DB: %s" % excp) super(BookkeepingReport, self).execute( production_id, prod_job_id, @@ -94,7 +110,7 @@ class BookkeepingReport(ModuleBase): bkLFNs, logFilePath = self._resolveInputVariables() doc = self.__makeBookkeepingXML(bkLFNs, logFilePath) - + self._sendParamsToES() if saveOnFile: bfilename = "bookkeeping_" + self.step_id + ".xml" with open(bfilename, "wb") as bfile: @@ -313,6 +329,7 @@ class BookkeepingReport(ModuleBase): typedParams.append(("WNMODEL", nodeInfo["ModelName"])) typedParams.append(("WNCPUPOWER", nodeInfo["CPU(MHz)"])) + self.cpu = nodeInfo["CPU(MHz)"] typedParams.append(("WNCACHE", nodeInfo["CacheSize(kB)"])) host = os.environ.get("HOSTNAME", os.environ.get("HOST")) @@ -533,7 +550,7 @@ class BookkeepingReport(ModuleBase): oFile = addChildNode(oFile, "Parameter", 0, ("EventStat", fileStats)) oFile = addChildNode(oFile, "Parameter", 0, ("FileSize", outputsize)) - + self.fileSize = outputsize oFile = addChildNode( oFile, "Parameter", 0, ("CreationDate", time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time()))) ) @@ -628,3 +645,33 @@ class BookkeepingReport(ModuleBase): return S_ERROR("Failed to obtain system information") return S_OK(result) + + ################################################################################ + + def _sendParamsToES(self): + """ + Sends a dict of parameters to ES + """ + params = {} + exectime, cputime = getStepCPUTimes(self.step_commons) + params["CPUTIME"] = cputime + params["ExecTime"] = exectime + params["NumberOfProcessors"] = self.numberOfProcessors + params["Production"] = self.production_id + params["DiracJobID"] = str(self.jobID) + params["ProgramName"] = self.applicationName + params["ProgramVersion"] = self.applicationVersion + if self.cpu: + params["WNCPUPOWER"] = self.cpu + if self.fileSize: + params["FileSize"] = self.fileSize + if self.eventType is not None: + params["EventType"] = self.eventType + else: + params["EventType"] = "Unknown" + params["SimDescription"] = self.simDescription + + res = self.elasticAppParametersDB.setJobParameters(str(self.jobID), params) + if not res["OK"]: + return res["Message"] + return res -- GitLab From 883e5f74da7a41593d4203638095e0c10a815859 Mon Sep 17 00:00:00 2001 From: rupozzi <ruben.pozzi@cern.ch> Date: Mon, 11 Jul 2022 18:47:10 +0200 Subject: [PATCH 2/4] feat: New DB class to monitor application parameters --- src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py index 24d1a9a1c9..fb65dca036 100644 --- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py +++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py @@ -81,8 +81,8 @@ class BookkeepingReport(ModuleBase): """Usual executor.""" try: self.elasticAppParametersDB = None - useESForJobParametersFlag = Operations().getValue("/Defaults/useESForJobParametersFlag", False) - if useESForJobParametersFlag: + useESForAppParametersFlag = Operations().getValue("/Defaults/useESForAppParametersFlag", False) + if useESForAppParametersFlag: try: result = ObjectLoader().loadObject( "ProductionManagement.DB.ESApplicationParametersDB", "ESApplicationParametersDB" @@ -110,7 +110,7 @@ class BookkeepingReport(ModuleBase): bkLFNs, logFilePath = self._resolveInputVariables() doc = self.__makeBookkeepingXML(bkLFNs, logFilePath) - self._sendParamsToES() + if saveOnFile: bfilename = "bookkeeping_" + self.step_id + ".xml" with open(bfilename, "wb") as bfile: @@ -118,6 +118,9 @@ class BookkeepingReport(ModuleBase): else: print(doc) + if useESForAppParametersFlag: + self._sendParamsToES() + return S_OK() except Exception as e: # pylint:disable=broad-except -- GitLab From 1b1bf4eaa6325ffdf4507fa6617f5b8b5804cccb Mon Sep 17 00:00:00 2001 From: rupozzi <ruben.pozzi@cern.ch> Date: Tue, 12 Jul 2022 12:18:20 +0200 Subject: [PATCH 3/4] fix: Syntax error in method --- .../ProductionManagementSystem/DB/ESApplicationParametersDB.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py index 57b17912d0..d19c9631ad 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/DB/ESApplicationParametersDB.py @@ -121,8 +121,9 @@ class ESApplicationSummaryDB(ElasticDB): :param parameters: list of tuples (name, value) pairs :returns: S_OK/S_ERROR as result of indexing """ + parametersDict = {} self.log.debug(f"Inserting parameters", "in {self.indexName}: for job {jobID}: {parameters}") - if isinstance(list, parametersDict): + if isinstance(list, parameters): parametersDict = dict(parameters) parametersDict["JobID"] = jobID -- GitLab From ca6dfa33d12767fb48a8afc8098fefd847ce54e9 Mon Sep 17 00:00:00 2001 From: rupozzi <ruben.pozzi@cern.ch> Date: Tue, 12 Jul 2022 16:20:26 +0200 Subject: [PATCH 4/4] fix: wrong sending to ES --- .../Service/BookkeepingManagerHandler.py | 120 +++++++++--------- .../XMLReader/XMLFilesReaderManager.py | 2 + 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py b/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py index 4a51066cec..af04e0e44a 100644 --- a/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py +++ b/src/LHCbDIRAC/BookkeepingSystem/Service/BookkeepingManagerHandler.py @@ -9,6 +9,7 @@ # or submit itself to any jurisdiction. # ############################################################################### """BookkeepingManager service is the front-end to the Bookkeeping database.""" +import six from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.DISET.RequestHandler import RequestHandler @@ -56,7 +57,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_OK() ############################################################################# - types_sendXMLBookkeepingReport = [str] + types_sendXMLBookkeepingReport = [six.string_types] def export_sendXMLBookkeepingReport(self, xml): """This method is used to upload an xml report which is produced after when @@ -66,6 +67,7 @@ class BookkeepingManagerHandler(RequestHandler): :param str xml: bookkeeping report """ + # self.elasticAppParametersDB.setJobParameters(str(self.jobID), params) retVal = self.xmlReader.readXMLfromString(xml) if not retVal["OK"]: self.log.error("Issue reading XML", retVal["Message"]) @@ -156,7 +158,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getAvailableFileTypes() ############################################################################# - types_insertFileTypes = [str, str, str] + types_insertFileTypes = [six.string_types, six.string_types, six.string_types] @classmethod def export_insertFileTypes(cls, ftype, desc, fileType): @@ -358,7 +360,7 @@ class BookkeepingManagerHandler(RequestHandler): return result ############################################################################# - types_getProcessingPass = [dict, str] + types_getProcessingPass = [dict, six.string_types] @classmethod def export_getProcessingPass(cls, in_dict, path=None): @@ -510,7 +512,7 @@ class BookkeepingManagerHandler(RequestHandler): gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality") if "RunNumbers" in in_dict: - gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber") + gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers") result = [] retVal = cls.bkkDB.getFiles( @@ -858,7 +860,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getMoreProductionInformations(prodid) ############################################################################# - types_getJobInfo = [str] + types_getJobInfo = [six.string_types] @classmethod def export_getJobInfo(cls, lfn): @@ -884,7 +886,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getJobInformation(in_dict) ############################################################################# - types_getRunNumber = [str] + types_getRunNumber = [six.string_types] @classmethod def export_getRunNumber(cls, lfn): @@ -892,7 +894,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getRunNumber(lfn) ############################################################################# - types_getRunNbAndTck = [str] + types_getRunNbAndTck = [six.string_types] @classmethod def export_getRunNbAndTck(cls, lfn): @@ -900,7 +902,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getRunNbAndTck(lfn) ############################################################################# - types_getProductionFiles = [int, str] + types_getProductionFiles = [six.integer_types, six.string_types] @classmethod def export_getProductionFiles(cls, prod, fileType, replica=default): @@ -909,7 +911,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionFiles(prod, fileType, replica) ############################################################################# - types_getProductionFilesBulk = [list, str] + types_getProductionFilesBulk = [list, six.string_types] @classmethod def export_getProductionFilesBulk(cls, prods, fileType, replica=default): @@ -933,7 +935,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getRunFiles(runid) ############################################################################# - types_updateFileMetaData = [str, dict] + types_updateFileMetaData = [six.string_types, dict] @classmethod def export_updateFileMetaData(cls, filename, fileAttr): @@ -945,7 +947,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.updateFileMetaData(filename, fileAttr) ############################################################################# - types_renameFile = [str, str] + types_renameFile = [six.string_types, six.string_types] @classmethod def export_renameFile(cls, oldLFN, newLFN): @@ -954,7 +956,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.renameFile(oldLFN, newLFN) ############################################################################# - types_getProductionProcessingPassID = [int] + types_getProductionProcessingPassID = [six.integer_types] @classmethod def export_getProductionProcessingPassID(cls, prodid): @@ -962,7 +964,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionProcessingPassID(prodid) ############################################################################# - types_getProductionProcessingPass = [int] + types_getProductionProcessingPass = [six.integer_types] @classmethod def export_getProductionProcessingPass(cls, prodid): @@ -996,7 +998,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_OK({"Successfull": successfull, "Faild": faild}) ############################################################################# - types_setFileDataQuality = [list, str] + types_setFileDataQuality = [list, six.string_types] @classmethod def export_setFileDataQuality(cls, lfns, flag): @@ -1004,7 +1006,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.setFileDataQuality(lfns, flag) ############################################################################# - types_setRunAndProcessingPassDataQuality = [(int, str), str, str] + types_setRunAndProcessingPassDataQuality = [six.integer_types, six.string_types, six.string_types] @classmethod def export_setRunAndProcessingPassDataQuality(cls, runNB, procpass, flag): @@ -1015,12 +1017,10 @@ class BookkeepingManagerHandler(RequestHandler): used to set the data quality flag to a given run files which processed by a given processing pass. """ - if isinstance(runNB, str): - runNB = int(runNB) return cls.bkkDB.setRunAndProcessingPassDataQuality(runNB, procpass, flag) ############################################################################# - types_setRunDataQuality = [int, str] + types_setRunDataQuality = [int, six.string_types] @classmethod def export_setRunDataQuality(cls, runNb, flag): @@ -1031,7 +1031,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.setRunDataQuality(runNb, flag) ############################################################################# - types_setQualityRun = [int, str] + types_setQualityRun = [int, six.string_types] @classmethod def export_setQualityRun(cls, runNb, flag): @@ -1039,7 +1039,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.setRunDataQuality(runNb, flag) ############################################################################# - types_setProductionDataQuality = [int, str] + types_setProductionDataQuality = [int, six.string_types] @classmethod def export_setProductionDataQuality(cls, prod, flag): @@ -1121,7 +1121,7 @@ class BookkeepingManagerHandler(RequestHandler): return self.export_getFileDescendents(lfn, depth, production, checkreplica) ############################################################################# - types_checkfile = [str] + types_checkfile = [six.string_types] @classmethod def export_checkfile(cls, fileName): @@ -1129,7 +1129,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.checkfile(fileName) ############################################################################# - types_checkFileTypeAndVersion = [str, str] + types_checkFileTypeAndVersion = [six.string_types, six.string_types] @classmethod def export_checkFileTypeAndVersion(cls, ftype, version): @@ -1137,7 +1137,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.checkFileTypeAndVersion(ftype, version) ############################################################################# - types_checkEventType = [int] + types_checkEventType = [six.integer_types] @classmethod def export_checkEventType(cls, eventTypeId): @@ -1169,7 +1169,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getSimConditions() ############################################################################# - types_removeReplica = [str] + types_removeReplica = [six.string_types] @classmethod def export_removeReplica(cls, fileName): @@ -1209,7 +1209,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getFileMetaDataForWeb(lfns) ############################################################################# - types_getProductionFilesForUsers = [int, dict, dict, int, int] + types_getProductionFilesForUsers = [int, dict, dict, six.integer_types, six.integer_types] @classmethod def export_getProductionFilesForUsers(cls, prod, ftype, sortDict, startItem, maxitems): @@ -1217,7 +1217,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionFilesForWeb(prod, ftype, sortDict, startItem, maxitems) ############################################################################# - types_getProductionFilesForWeb = [int, dict, dict, int, int] + types_getProductionFilesForWeb = [six.integer_types, dict, dict, six.integer_types, six.integer_types] @classmethod def export_getProductionFilesWeb(cls, prod, ftype, sortDict, startItem, maxitems): @@ -1242,7 +1242,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.addReplica(fileName) ############################################################################# - types_getRunInformations = [int] + types_getRunInformations = [six.integer_types] @classmethod def export_getRunInformations(cls, runnb): @@ -1258,7 +1258,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getRunInformation(runnb) ############################################################################# - types_getFileCreationLog = [str] + types_getFileCreationLog = [six.string_types] @classmethod def export_getFileCreationLog(cls, lfn): @@ -1266,7 +1266,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getFileCreationLog(lfn) ############################################################################# - types_getLogfile = [str] + types_getLogfile = [six.string_types] @classmethod def export_getLogfile(cls, lfn): @@ -1274,7 +1274,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getFileCreationLog(lfn) ############################################################################# - types_insertEventType = [int, str, str] + types_insertEventType = [six.integer_types, six.string_types, six.string_types] @classmethod def export_insertEventType(cls, evid, desc, primary): @@ -1288,14 +1288,14 @@ class BookkeepingManagerHandler(RequestHandler): return S_OK(str(evid) + " event type exists") ############################################################################# - types_addEventType = [int, str, str] + types_addEventType = [six.integer_types, six.string_types, six.string_types] def export_addEventType(self, evid, desc, primary): """more info in the BookkeepingClient.py.""" return self.export_insertEventType(evid, desc, primary) ############################################################################# - types_updateEventType = [int, str, str] + types_updateEventType = [six.integer_types, six.string_types, six.string_types] @classmethod def export_updateEventType(cls, evid, desc, primary): @@ -1343,7 +1343,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionSummary(cName, cVersion, simdesc, pgroup, production, ftype, evttype) ############################################################################# - types_getProductionInformation = [int] + types_getProductionInformation = [six.integer_types] def export_getProductionInformation(self, prodid): """It returns statistics (data processing phases, number of events, etc.) for a given production""" @@ -1405,7 +1405,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_OK(result) ############################################################################# - types_getFileHistory = [str] + types_getFileHistory = [six.string_types] @classmethod def export_getFileHistory(cls, lfn): @@ -1461,7 +1461,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_OK(result) ############################################################################# - types_getJobsNb = [int] + types_getJobsNb = [six.integer_types] @classmethod @deprecated("Use getProductionNbOfJobs") @@ -1470,7 +1470,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfJobs(prodid) ############################################################################# - types_getProductionNbOfJobs = [int] + types_getProductionNbOfJobs = [six.integer_types] @classmethod def export_getProductionNbOfJobs(cls, prodid): @@ -1478,7 +1478,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfJobs(prodid) ############################################################################# - types_getNumberOfEvents = [int] + types_getNumberOfEvents = [six.integer_types] @classmethod @deprecated("Use getProductionNbOfEvents") @@ -1487,7 +1487,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfEvents(prodid) ############################################################################# - types_getProductionNbOfEvents = [int] + types_getProductionNbOfEvents = [six.integer_types] @classmethod def export_getProductionNbOfEvents(cls, prodid): @@ -1495,7 +1495,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfEvents(prodid) ############################################################################# - types_getSizeOfFiles = [int] + types_getSizeOfFiles = [six.integer_types] @classmethod @deprecated("Use getProductionSizeOfFiles") @@ -1504,7 +1504,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionSizeOfFiles(prodid) ############################################################################# - types_getProductionSizeOfFiles = [int] + types_getProductionSizeOfFiles = [six.integer_types] @classmethod def export_getProductionSizeOfFiles(cls, prodid): @@ -1512,7 +1512,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionSizeOfFiles(prodid) ############################################################################# - types_getNbOfFiles = [int] + types_getNbOfFiles = [six.integer_types] @classmethod @deprecated("Use getProductionNbOfFiles") @@ -1521,7 +1521,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfFiles(prodid) ############################################################################# - types_getProductionNbOfFiles = [int] + types_getProductionNbOfFiles = [six.integer_types] @classmethod def export_getProductionNbOfFiles(cls, prodid): @@ -1529,7 +1529,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionNbOfFiles(prodid) ############################################################################# - types_getNbOfJobsBySites = [int] + types_getNbOfJobsBySites = [six.integer_types] @classmethod def export_getNbOfJobsBySites(cls, prodid): @@ -1546,7 +1546,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getAvailableTags() ############################################################################# - types_getProcessedEvents = [int] + types_getProcessedEvents = [six.integer_types] @classmethod @deprecated("Use getProductionProcessedEvents") @@ -1555,7 +1555,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getProductionProcessedEvents(prodid) ############################################################################# - types_getProductionProcessedEvents = [int] + types_getProductionProcessedEvents = [six.integer_types] @classmethod def export_getProductionProcessedEvents(cls, prodid): @@ -1617,7 +1617,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.setFilesVisible(lfns) ############################################################################# - types_getRunAndProcessingPassDataQuality = [int, int] + types_getRunAndProcessingPassDataQuality = [six.integer_types, six.integer_types] @classmethod def export_getRunAndProcessingPassDataQuality(cls, runnb, processing): @@ -1633,7 +1633,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getAvailableConfigurations() ############################################################################# - types_getRunProcessingPass = [int] + types_getRunProcessingPass = [six.integer_types] @classmethod def export_getRunProcessingPass(cls, runnumber): @@ -1689,7 +1689,7 @@ class BookkeepingManagerHandler(RequestHandler): gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality") if "RunNumbers" in values: - gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber") + gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers") result = [] retVal = cls.bkkDB.getFiles( @@ -1766,7 +1766,7 @@ class BookkeepingManagerHandler(RequestHandler): gLogger.verbose("DataQualityFlag will be removed. It will changed to DataQuality") if "RunNumbers" in in_dict: - gLogger.verbose("RunNumbers will be removed. It will changed to RunNumber") + gLogger.verbose("RunNumbers will be removed. It will changed to RunNumbers") gLogger.debug("getVisibleFilesWithMetadata->", "%s" % in_dict) result = {} @@ -1983,7 +1983,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_ERROR("The Production dictionary key is missing!!!") ############################################################################# - types_getRunQuality = [str, str] + types_getRunQuality = [six.string_types, six.string_types] @deprecated("Use getRunWithProcessingPassAndDataQuality") def export_getRunQuality(self, procpass, flag=default): @@ -1992,7 +1992,7 @@ class BookkeepingManagerHandler(RequestHandler): return self.export_getRunWithProcessingPassAndDataQuality(procpass, flag) ############################################################################# - types_getRunWithProcessingPassAndDataQuality = [str, str] + types_getRunWithProcessingPassAndDataQuality = [six.string_types, six.string_types] @classmethod def export_getRunWithProcessingPassAndDataQuality(cls, procpass, flag=default): @@ -2034,7 +2034,7 @@ class BookkeepingManagerHandler(RequestHandler): return S_ERROR("The run number has to be specified!") ############################################################################# - types_getProcessingPassId = [str] + types_getProcessingPassId = [six.string_types] @classmethod def export_getProcessingPassId(cls, fullpath): @@ -2125,7 +2125,7 @@ class BookkeepingManagerHandler(RequestHandler): return self.export_getTCKs(in_dict) ############################################################################# - types_getSteps = [int] + types_getSteps = [six.integer_types] @classmethod def export_getSteps(cls, prodID): @@ -2176,7 +2176,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getDirectoryMetadata(lfn) ############################################################################# - types_getFilesForGUID = [str] + types_getFilesForGUID = [six.string_types] @classmethod def export_getFilesForGUID(cls, guid): @@ -2205,7 +2205,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.getListOfFills(configName, configVersion, conddescription) ############################################################################# - types_getRunsForFill = [int] + types_getRunsForFill = [six.integer_types] @classmethod def export_getRunsForFill(cls, fillid): @@ -2252,7 +2252,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.updateSimulationConditions(in_dict) ############################################################################# - types_deleteSimulationConditions = [int] + types_deleteSimulationConditions = [six.integer_types] @classmethod def export_deleteSimulationConditions(cls, simid): @@ -2274,14 +2274,14 @@ class BookkeepingManagerHandler(RequestHandler): """It returns the input and output files for a given DIRAC jobid.""" return cls.bkkDB.getJobInputOutputFiles(diracjobids) - types_setRunOnlineFinished = [int] + types_setRunOnlineFinished = [six.integer_types] @classmethod def export_setRunOnlineFinished(cls, runnumber): """It is used to set the run finished...""" return cls.bkkDB.setRunStatusFinished(runnumber, "Y") - types_setRunOnlineNotFinished = [int] + types_setRunOnlineNotFinished = [six.integer_types] @classmethod def export_setRunOnlineNotFinished(cls, runnumber): @@ -2309,7 +2309,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.fixRunLuminosity(runnumbers) ############################################################################# - types_getProductionProducedEvents = [int] + types_getProductionProducedEvents = [six.integer_types] @classmethod def export_getProductionProducedEvents(cls, prodid): @@ -2357,7 +2357,7 @@ class BookkeepingManagerHandler(RequestHandler): return cls.bkkDB.bulkupdateEventType(eventtypes) ############################################################################# - types_getRunConfigurationsAndDataTakingCondition = [int] + types_getRunConfigurationsAndDataTakingCondition = [six.integer_types] @classmethod def export_getRunConfigurationsAndDataTakingCondition(cls, runnumber): diff --git a/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py b/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py index bda5149dea..9e6e029738 100644 --- a/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py +++ b/src/LHCbDIRAC/BookkeepingSystem/Service/XMLReader/XMLFilesReaderManager.py @@ -79,6 +79,8 @@ class XMLFilesReaderManager(object): """interprets the xml content.""" self.log.debug("Start Job Processing") + sendToElasticSearch(job) + # prepare for the insert, check the existence of the input files and retreive the fileid inputFiles = [inputFile.name for inputFile in job.inputFiles] if inputFiles: -- GitLab