Skip to content
Snippets Groups Projects
Commit 39e6cd84 authored by Zoltan Mathe's avatar Zoltan Mathe
Browse files

Merge branch 'finalRemovalOfAnalyseLogF' into 'devel'

final removal of AnalyseLogFile

See merge request !357
parents 692e363c 5890dfb5
No related branches found
No related tags found
No related merge requests found
""" Utilities to check the application log files (for production jobs)
"""
### This module is now not used anywhere (starting from workflows created after January 2017)
### It is anyway still here because of possibly old workflows created previously
import re
import os
from DIRAC import gLogger
__RCSID__ = "$Id$"
class LogError( Exception ):
""" custom exception
"""
def __init__( self, message = "" ):
self.message = message
Exception.__init__( self, message )
def __str__( self ):
return "LogError:" + repr( self.message )
################################################################################
class ProductionLog( object ):
""" Encapsulate production log info
"""
def __init__( self, fileName, applicationName = '', log = None ):
# Well known application Errors
self.__APPLICATION_ERRORS__ = {'Terminating event processing loop due to errors' : 'Event Loop Not Terminated'}
# Well known Gaudi Errors
self.__GAUDI_ERRORS__ = {'Could not connect' : 'CASTOR error connection',
'SysError in <TDCacheFile::ReadBuffer>: error reading from file' : 'DCACHE connection error',
'Failed to resolve' : 'IODataManager error',
'Error: connectDataIO' : 'connectDataIO error',
'Error:connectDataIO' : 'connectDataIO error',
' glibc ' : 'Problem with glibc',
'segmentation violation' : 'segmentation violation',
'GaussTape failed' : 'GaussTape failed',
'Writer failed' : 'Writer failed',
'Bus error' : 'Bus error',
'Standard std::exception is caught' : 'Exception caught',
'User defined signal 1' : 'User defined signal 1',
'Not found DLL' : 'Not found DLL',
'std::bad_alloc' : 'FATAL Bad alloc'}
if not log:
self.log = gLogger.getSubLogger( 'ProductionLogs' )
else:
self.log = log
self.fileName = fileName
self.log.info( "Attempting to open log file: %s" % fileName )
if not os.path.exists( fileName ):
raise LogError( 'Log File Not Available' )
if os.stat( fileName )[6] == 0:
raise LogError( 'Log File Is Empty' )
fopen = open( fileName, 'r' )
self.fileString = fopen.read()
fopen.close()
self.applicationName = applicationName
if not self.applicationName:
self.__guessAppName()
self.log.info( 'Guessed application name is "%s"' % ( self.applicationName ) )
self.prodName = None
self.jobName = None
self.stepName = None
################################################################################
def analyse( self ):
""" analyse the log
"""
try:
self.__checkErrors()
self.__checkFinish()
self.__checkLogEnd()
self.log.info( 'Logs OK' )
return True
except LogError, e:
self.log.error( 'LogError', "Found error in " + self.fileName + ": " + str( e ) )
return False
################################################################################
def __checkErrors( self ):
for errString in self.__GAUDI_ERRORS__.keys():
found = re.findall( errString, self.fileString )
if found:
raise LogError( errString )
################################################################################
def __checkFinish( self ):
for errString in self.__APPLICATION_ERRORS__.keys():
# self.log.info( 'Checking for "%s" meaning job would fail with "%s"' % ( errString, description ) )
found = re.findall( errString, self.fileString )
if found:
raise LogError( errString )
################################################################################
def __checkLogEnd( self ):
""" This method uses Gaudi strings that are well known and determines
success / failure based on conventions.
"""
# Check if the application finish successfully
toFind = 'Application Manager Finalized successfully'
if self.applicationName.lower() == 'moore':
# toFind = 'Service finalized successfully'
toFind = ''
if toFind:
okay = re.findall( toFind, self.fileString )
if not okay:
raise LogError( '"%s" was not found in the log' % toFind )
################################################################################
def __guessAppName( self ):
""" Given a log file (in a string), look for the application
"""
for line in self.fileString.split( '\n' ):
if ( re.search( 'Welcome to', line ) and re.search( 'version', line ) ) or \
( re.search( 'Welcome to', line ) and re.search( 'Revision', line ) ):
self.applicationName = line.split()[2]
if self.applicationName == 'ApplicationMgr':
self.applicationName = 'LHCb'
if not self.applicationName:
raise LogError( "Could not guess the app name" )
return self.applicationName
################################################################################
def analyseLogFile( fileName, applicationName = '', log = None, lf_o = None ):
""" Analyse a log file
"""
if not lf_o:
lf_o = ProductionLog( fileName, applicationName, log = log )
return lf_o.analyse()
# EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#
""" Analyse log file(s) module
"""
### This module is now not used anywhere (starting from workflows created after January 2017)
### It is anyway still here because of possibly old workflows created previously
import os
import re
import glob
from DIRAC import S_OK, S_ERROR, gLogger
from DIRAC.Resources.Catalog.PoolXMLFile import getGUID
from DIRAC.FrameworkSystem.Client.NotificationClient import NotificationClient
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs
from LHCbDIRAC.Workflow.Modules.ModuleBase import ModuleBase
from LHCbDIRAC.Core.Utilities.ProductionLogs import analyseLogFile
__RCSID__ = "$Id$"
class AnalyseLogFile( ModuleBase ):
""" Analyse not only the XML summary, also the log file is inspected
"""
def __init__( self, bkClient = None, dm = None ):
"""Module initialization.
"""
self.log = gLogger.getSubLogger( 'AnalyseLogFile' )
super( AnalyseLogFile, self ).__init__( self.log, bkClientIn = bkClient, dm = dm )
self.version = __RCSID__
self.logFilePath = ''
self.coreFile = ''
self.nc = NotificationClient()
self.logAnalyser = analyseLogFile
def _resolveInputVariables( self ):
""" By convention any workflow parameters are resolved here.
"""
super( AnalyseLogFile, self )._resolveInputVariables()
super( AnalyseLogFile, self )._resolveInputStep()
def execute( self, production_id = None, prod_job_id = None, wms_job_id = None,
workflowStatus = None, stepStatus = None,
wf_commons = None, step_commons = None,
step_number = None, step_id = None ):
""" Main execution method.
"""
try:
super( AnalyseLogFile, self ).execute( self.version,
production_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_commons, step_commons,
step_number, step_id )
self._resolveInputVariables()
if self.workflow_commons.has_key( 'AnalyseLogFilePreviouslyFinalized' ):
self.log.info( "AnalyseLogFile has already run for this workflow and finalized with sending an error email" )
return S_OK()
self.log.info( "Performing log file analysis for %s" % ( self.applicationLog ) )
# Resolve the step and job input data
analyseLogResult = self.logAnalyser( fileName = self.applicationLog,
applicationName = self.applicationName,
log = self.log )
if not analyseLogResult:
self._finalizeWithErrors( "Log reports ERROR" )
# return S_OK if the Step already failed to avoid overwriting the error
if not self.stepStatus['OK']:
return S_OK()
self.setApplicationStatus( "Log reports ERROR" )
return S_ERROR( "Log reports ERROR" )
# if the log looks ok but the step already failed, preserve the previous error
elif not self.stepStatus['OK']:
return S_OK()
else:
# If the job was successful Update the status of the files to processed
self.log.info( "Log file %s OK" % self.applicationLog )
self.setApplicationStatus( "%s Step OK" % self.applicationName )
return S_OK()
except Exception as e: #pylint:disable=broad-except
self.log.exception( "Failure in AnalyseLogFile execute module", lException = e )
self.setApplicationStatus( repr(e) )
return S_ERROR( str(e) )
finally:
super( AnalyseLogFile, self ).finalize( self.version )
################################################################################
# AUXILIAR FUNCTIONS
################################################################################
def _finalizeWithErrors( self, subj ):
""" Method that sends an email and uploads intermediate job outputs.
"""
self.workflow_commons['AnalyseLogFilePreviouslyFinalized'] = True
# Have to check that the output list is defined in the workflow commons, this is
# done by the first BK report module that executes at the end of a step but in
# this case the current step 'listoutput' must be added.
if self.workflow_commons.has_key( 'outputList' ):
for outputItem in self.step_commons['listoutput']:
if outputItem not in self.workflow_commons['outputList']:
self.workflow_commons['outputList'].append( outputItem )
else:
self.workflow_commons['outputList'] = self.step_commons['listoutput']
result = constructProductionLFNs( self.workflow_commons, self.bkClient )
if not result['OK']:
self.log.error( "Could not create production LFNs with message '%s'" % ( result['Message'] ) )
raise Exception( result['Message'] )
if not result['Value'].has_key( 'DebugLFNs' ):
self.log.error( "No debug LFNs found after creating production LFNs, result was:%s" % result )
raise Exception ( "DebugLFNs Not Found" )
debugLFNs = result['Value']['DebugLFNs']
subject = '[' + self.siteName + '][' + self.applicationName + '] ' + self.applicationVersion + \
": " + subj + ' ' + self.production_id + '_' + self.prod_job_id + ' JobID=' + str( self.jobID )
msg = 'The Application ' + self.applicationName + ' ' + self.applicationVersion + ' had a problem \n'
msg = msg + 'at site ' + self.siteName + '\n'
msg = msg + 'JobID is ' + str( self.jobID ) + '\n'
msg = msg + 'JobName is ' + self.production_id + '_' + self.prod_job_id + '\n'
if self.coreFile:
self.log.info( 'Will attempt to upload core dump file: %s' % self.coreFile )
msg += '\n\nCore file found:\n'
coreLFN = ''
for lfn in debugLFNs:
if re.search( '[0-9]+_core', os.path.basename( lfn ) ):
coreLFN = lfn
if self._WMSJob():
if coreLFN:
self.log.info( "Attempting putAndRegister('%s','%s','%s') on master catalog" % ( coreLFN, self.coreFile,
self.debugSE ) )
result = DataManager( masterCatalogOnly = True ).putAndRegister( coreLFN, self.coreFile, self.debugSE )
self.log.info( result )
if not result['OK']:
self.log.error( 'Could not save core dump file', result['Message'] )
msg += 'Could not save dump file with message "%s"\n' % result['Message']
else:
msg += coreLFN + '\n'
else:
self.log.info( "JOBID is null, would have attempted to upload: LFN:%s, file %s to %s" % ( coreLFN,
self.coreFile,
self.debugSE ) )
toUpload = {}
for lfn in debugLFNs:
if os.path.exists( os.path.basename( lfn ) ):
toUpload[os.path.basename( lfn )] = lfn
if toUpload:
msg += '\n\nIntermediate job data files:\n'
for fname, lfn in toUpload.items():
guidResult = getGUID( fname )
guidInput = ''
if not guidResult['OK']:
self.log.error( "Could not find GUID for %s with message" % ( fname ), guidResult['Message'] )
elif guidResult['generated']:
self.log.info( 'PoolXMLFile generated GUID(s) for the following files ', ', '.join( guidResult['generated'] ) )
guidInput = guidResult['Value'][fname]
else:
guidInput = guidResult['Value'][fname]
if self._WMSJob():
self.log.info( 'Attempting putAndRegister("%s","%s","CERN-DEBUG","%s") on master catalog"' % ( fname, lfn,
guidInput ) )
result = DataManager( masterCatalogOnly = True ).putAndRegister( lfn, fname, self.debugSE, guidInput )
self.log.info( result )
if not result['OK']:
self.log.error( 'Could not save INPUT data file with result', str( result ) )
msg += 'Could not save intermediate data file %s with result\n%s\n' % ( fname, result )
else:
msg = msg + lfn + '\n' + str( result ) + '\n'
else:
self.log.info( 'JOBID is null, would have attempted to upload: LFN:%s, file %s, GUID %s to %s'
% ( lfn, fname, guidInput, self.debugSE ) )
if self.applicationLog:
logurl = 'http://lhcb-logs.cern.ch/storage' + self.logFilePath
msg = msg + '\n\nLog Files directory for the job:\n'
msg = msg + logurl + '/\n'
msg = msg + '\n\nLog File for the problematic step:\n'
msg = msg + logurl + '/' + self.applicationLog + '\n'
msg = msg + '\n\nJob StdOut:\n'
msg = msg + logurl + '/std.out\n'
msg = msg + '\n\nJob StdErr:\n'
msg = msg + logurl + '/std.err\n'
globList = glob.glob( '*coredump.log' )
for check in globList:
if os.path.isfile( check ):
self.log.verbose( 'Found locally existing core dump file: %s' % ( check ) )
fd = open( check )
contents = fd.read()
msg = msg + '\n\nCore dump:\n\n' + contents
fd.close()
if not self._WMSJob():
self.log.info( "JOBID is null, *NOT* sending mail, for information the mail was:\n====>Start\n%s\n<====End"
% ( msg ) )
else:
mailAddress = self.opsH.getValue( 'EMail/JobFailures', 'lhcb-datacrash@cern.ch' )
self.log.info( 'Sending crash mail for job to %s' % ( mailAddress ) )
for mA in mailAddress.replace( ' ', '' ).split( ',' ):
res = self.nc.sendMail( mA, subject, msg, 'joel.closier@cern.ch', localAttempt = False )
if not res['OK']:
self.log.warn( "The mail could not be sent" )
################################################################################
# END AUXILIAR FUNCTIONS
################################################################################
# EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF
......@@ -77,10 +77,6 @@ class AnalyseXMLSummary( ModuleBase ):
failTheJob = True
if failTheJob:
if self.workflow_commons.has_key( 'AnalyseLogFilePreviouslyFinalized' ):
self.log.info( "AnalyseLogFile has already run for this workflow and finalized with sending an error email" )
return S_OK()
self._finalizeWithErrors( "XMLSummary reports error" )
self.setApplicationStatus( "XMLSummary reports error" )
......@@ -141,9 +137,6 @@ class AnalyseXMLSummary( ModuleBase ):
def _finalizeWithErrors( self, subj ):
""" Method that sends an email and uploads intermediate job outputs.
"""
# FIXME: refactoring needed, this is very similar to what is in AnalyseLogFile
self.workflow_commons['AnalyseLogFilePreviouslyFinalized'] = True
# Have to check that the output list is defined in the workflow commons, this is
# done by the first BK report module that executes at the end of a step but in
# this case the current step 'listoutput' must be added.
......
......@@ -33,7 +33,6 @@ from LHCbDIRAC.BookkeepingSystem.Client.test.mock_BookkeepingClient import bkc_m
# sut
from LHCbDIRAC.Workflow.Modules.AnalyseXMLSummary import AnalyseXMLSummary
from LHCbDIRAC.Workflow.Modules.AnalyseLogFile import AnalyseLogFile
from LHCbDIRAC.Workflow.Modules.BookkeepingReport import BookkeepingReport
from LHCbDIRAC.Workflow.Modules.FailoverRequest import FailoverRequest
from LHCbDIRAC.Workflow.Modules.RemoveInputData import RemoveInputData
......@@ -163,8 +162,6 @@ class AnalyseXMLSummarySuccess( ModulesTestCase ):
for wf_cs in copy.deepcopy( wf_commons ):
for s_cs in step_commons:
if wf_cs.has_key( 'AnalyseLogFilePreviouslyFinalized' ):
continue
self.assertTrue( axlf.execute( prod_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_cs, s_cs,
......@@ -246,68 +243,6 @@ class AnalyseXMLSummarySuccess( ModulesTestCase ):
self.assertTrue( res )
self.assertEqual( axlf.fileReport.statusDict, {'aa/1.txt': 'Problematic'} )
#############################################################################
# AnalyseLogFile.py
#############################################################################
@patch( "LHCbDIRAC.Workflow.Modules.ModuleBase.RequestValidator", side_effect = MagicMock() )
class AnalyseLogFileSuccess( ModulesTestCase ):
#################################################
def test_execute( self, _patch ):
alf = AnalyseLogFile( bkClient = bkc_mock, dm = dm_mock )
alf.stepInputData = ['some.sdst', '00012345_00006789_1.sdst']
alf.jobType = 'merge'
alf.nc = self.nc_mock
logAnalyser = MagicMock()
logAnalyser.return_value = True
alf.logAnalyser = logAnalyser
# no errors, no input data
for wf_cs in copy.deepcopy( wf_commons ):
for s_cs in step_commons:
self.assertTrue( alf.execute( prod_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_cs, s_cs,
step_number, step_id )['OK'] )
alf.jobType = 'reco'
# logAnalyser gives errors
logAnalyser.return_value = False
alf.logAnalyser = logAnalyser
for wf_cs in copy.deepcopy( wf_commons ):
for s_cs in copy.deepcopy( step_commons ):
if wf_cs.has_key( 'AnalyseLogFilePreviouslyFinalized' ):
continue
self.assertFalse( alf.execute( prod_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_cs, s_cs,
step_number, step_id )['OK'] )
# there's a core dump
logAnalyser.return_value = False
alf.logAnalyser = logAnalyser
open( 'ErrorLogging_Step1_coredump.log', 'w' ).close()
for wf_cs in copy.deepcopy( wf_commons ):
for s_cs in step_commons:
if not wf_cs.has_key( 'AnalyseLogFilePreviouslyFinalized' ):
self.assertFalse( alf.execute( prod_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_cs, s_cs,
step_number, step_id )['OK'] )
else:
self.assertTrue( alf.execute( prod_id, prod_job_id, wms_job_id,
workflowStatus, stepStatus,
wf_cs, s_cs,
step_number, step_id )['OK'] )
#############################################################################
# FailoverRequest.py
#############################################################################
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment