Skip to content
Snippets Groups Projects
Commit 0c904447 authored by Zoltan Mathe's avatar Zoltan Mathe
Browse files

Merge branch 'master_FIX_RAWIntegrityAgentAddFile' into 'master'

[Master ]RAWIntegrity: addFile and removeFile by bulk

See merge request !371
parents 30455585 1b543b4f
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
from DIRAC.Resources.Storage.StorageElement import StorageElement
from LHCbDIRAC.DataManagementSystem.DB.RAWIntegrityDB import RAWIntegrityDB
from DIRAC.Core.Utilities.List import breakListIntoChunks
__RCSID__ = "$Id$"
......@@ -227,17 +228,22 @@ class RAWIntegrityAgent(AgentModule):
# Update copiedFiles to also contain the newly copied files
copiedFiles.update(dict((lfn, allUnmigratedFilesMeta[lfn]) for lfn in filesNewlyCopied))
# Try to register them all
res = self.fileCatalog.addFile(copiedFiles)
successfulRegister = {}
failedRegister = {}
if not res['OK']:
self.log.error("Completely failed to register successfully copied file.", res['Message'])
failedRegister = dict((lfn, res['Message']) for lfn in copiedFiles)
else:
successfulRegister = res['Value']['Successful']
failedRegister = res['Value']['Failed']
# Try to register them by batch
for lfnChunk in breakListIntoChunks(copiedFiles, 100):
# Add the metadata
lfnDictChuck = dict((lfn, copiedFiles[lfn]) for lfn in lfnChunk)
res = self.fileCatalog.addFile(lfnDictChuck)
if not res['OK']:
self.log.error("Completely failed to register some successfully copied file.",
res['Message'])
failedRegister.update(dict((lfn, res['Message']) for lfn in lfnDictChuck))
else:
successfulRegister.update(res['Value']['Successful'])
failedRegister.update(res['Value']['Failed'])
gMonitor.addMark("ErrorRegister", len(failedRegister))
for lfn, reason in failedRegister.iteritems():
......
......@@ -7,6 +7,8 @@ from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Resources.Storage.StorageBase import StorageBase
from DIRAC.Resources.Storage.Utilities import checkArgumentFormat
from DIRAC.Core.Utilities.List import breakListIntoChunks
__RCSID__ = "$Id$"
......@@ -74,27 +76,33 @@ class LHCbOnlineStorage(StorageBase):
# Here we are sure of the unicity of the basename since it is for raw data only
filesToUrls = dict((os.path.basename(f), f) for f in urls)
filenames = filesToUrls.keys()
try:
success, errorOrFailed = self.server.endMigratingFileBulk(filenames)
if success:
# in case of success, errorOrFailed contains the files for which it failed
failedFiles = set(errorOrFailed)
for fn in filenames:
for filenameChunck in breakListIntoChunks(filenames, 100):
try:
success, errorOrFailed = self.server.endMigratingFileBulk(filenameChunck)
if success:
# in case of success, errorOrFailed contains the files for which it failed
failedFiles = set(errorOrFailed)
for fn in filenameChunck:
fullUrl = filesToUrls[fn]
if fn in failedFiles:
failed[fullUrl] = "Failed to remove, check datamover logs"
else:
successful[fullUrl] = True
gLogger.info("LHCbOnline.getFile: Successfully issued removal to RunDB for chuck.")
else:
errStr = "LHCbOnline.removeFile: Failed to issue removal for chunck to RunDB %s" % errorOrFailed
for fn in filenameChunck:
fullUrl = filesToUrls[fn]
failed[fullUrl] = errStr
gLogger.error(errStr, urls)
except Exception as x: #pylint: disable=broad-except
errStr = "LHCbOnline.getFile: Exception for chunck while issuing removal to RunDB."
gLogger.exception(errStr, lException = x)
for fn in filenameChunck:
fullUrl = filesToUrls[fn]
if fn in failedFiles:
failed[fullUrl] = "Failed to remove, check datamover logs"
else:
successful[fullUrl] = True
gLogger.info("LHCbOnline.getFile: Successfully issued removal to RunDB.")
else:
errStr = "LHCbOnline.removeFile: Failed to issue removal to RunDB %s" % errorOrFailed
for url in urls:
failed[url] = errStr
gLogger.error(errStr, urls)
except Exception as x: #pylint: disable=broad-except
errStr = "LHCbOnline.getFile: Exception while issuing removal to RunDB."
gLogger.exception(errStr, lException = x)
for url in urls:
failed[url] = errStr
failed[fullUrl] = errStr
resDict = {'Failed': failed, 'Successful': successful}
return S_OK(resDict)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment