#!/usr/bin/env python ############################################################################## # # NAME: srmvometrics.py # # FACILITY: SAM (Service Availability Monitoring) # # COPYRIGHT: # Copyright (c) 2009, Members of the EGEE Collaboration. # http://www.eu-egee.org/partners/ # Licensed under the Apache License, Version 2.0. # http://www.apache.org/licenses/LICENSE-2.0 # This software is provided "as is", without warranties # or conditions of any kind, either express or implied. # # DESCRIPTION: # # VO-specific Nagios SRM metrics. # # AUTHORS: Nicolo Magini, CERN # AUTHORS: Alessandro Di Girolamo, CERN # AUTHORS: Konstantin Skaburskas, CERN # # CREATED: 23-Jul-2010 # # VERSION: 1.7.7 # MODIFIED: 12-Nov-2014 # NOTES: Using new PhEDEX senames API to get list of SEs to test in CMS GetPFNFromTFC probe # # ############################################################################## """ Nagios SRM metrics. Nagios SRM metrics. Nicolo Magini , Alessandro Di Girolamo CERN IT Experiment Support SAM (Service Availability Monitoring) """ import os import sys import getopt import time #@UnresolvedImport import commands import errno import re import urllib2 import platform import simplejson import pickle import datetime import urlparse import filecmp try: from gridmon import probe from gridmon import utils as samutils from gridmon import gridutils from gridmon.process import signaling import gfal2 except ImportError,e: summary = "UNKNOWN: Error loading modules : %s" % (e) sys.stdout.write(summary+'\n') sys.stdout.write(summary+'\nsys.path: %s\n'% str(sys.path)) sys.exit(3) # Reasonable defaults for timeouts LCG_GFAL_BDII_TIMEOUT = 10 LCG_UTIL_TIMEOUT_BDII = LCG_GFAL_BDII_TIMEOUT LCG_UTIL_TIMEOUT_CONNECT = 10 LCG_UTIL_TIMEOUT_SENDRECEIVE = 120 LCG_UTIL_TIMEOUT_SRM = 180 gfal2.set_verbose(gfal2.verbose_level.debug) class SRMVOMetrics(probe.MetricGatherer) : """A Metric Gatherer specific for SRM.""" # Service version(s) svcVers = ['1', '2'] # NOT USED YET svcVer = '2' # The probe's author name space -- CHANGE it to your ns ns = 'org.cms' # Timeouts _timeouts = { 'srm_connect' : LCG_UTIL_TIMEOUT_SENDRECEIVE, 'ldap_timelimit' : LCG_GFAL_BDII_TIMEOUT, 'LCG_GFAL_BDII_TIMEOUT' : LCG_GFAL_BDII_TIMEOUT, 'lcg_util' : { 'CLI': { 'connect-timeout' : LCG_UTIL_TIMEOUT_CONNECT, 'sendreceive-timeout': LCG_UTIL_TIMEOUT_SENDRECEIVE, 'bdii-timeout' : LCG_UTIL_TIMEOUT_BDII, 'srm-timeout' : LCG_UTIL_TIMEOUT_SRM }, 'API': { 'connect-timeout' : LCG_UTIL_TIMEOUT_CONNECT} } } _ldap_url = "ldap://lcg-bdii.cern.ch:2170" probeinfo = { 'probeName' : ns+'.SRM-Probe', 'probeVersion' : '1.0', 'serviceVersion' : '1.*, 2.*'} # Metrics' info _metrics = { 'GetPFNFromTFC' : {'metricDescription': "Get full SRM endpoints and space tokens from PhEDEx DataService TFC module.", 'cmdLineOptions' : ['lfn='], 'cmdLineOptionsReq' : [], 'metricChildren' : ['VOLsDir','VOPut','VOLs','VOGetTURLs','VOGet','VODel'], 'critical' :'N' }, 'VOLsDir' : {'metricDescription': "List content of VO's top level space area(s) in SRM.", 'cmdLineOptions' : ['se-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : [], 'critical' : 'N', 'statusMsgs' : {'OK' :'OK: Storage Path directory was listed successfully.', 'WARNING' :'WARNING: Problems listing Storage Path directory.' , 'CRITICAL':'CRITICAL: Problems listing Storage Path directory.' , 'UNKNOWN' :'UNKNOWN: Problems listing Storage Path directory.'} }, 'VOPut' : {'metricDescription': "Copy a local file to the SRM into default space area(s).", 'cmdLineOptions' : ['se-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : ['VOLs','VOGetTURLs','VOGet','VODel'], 'critical' :'N' }, 'VOLs' : {'metricDescription': "List (previously copied) file(s) on the SRM.", 'cmdLineOptions' : ['se-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : [], 'critical' : 'N', 'statusMsgs' : {'OK' :'OK: File(s) was listed successfully.', 'WARNING' :'WARNING: Problems listing file(s).' , 'CRITICAL':'CRITICAL: Problems listing file(s).' , 'UNKNOWN' :'UNKNOWN: Problems listing file(s).'} }, 'VOGetTURLs' : {'metricDescription': "Get Transport URLs for the file copied to storage.", 'cmdLineOptions' : ['se-timeout=', 'ldap-uri=', 'ldap-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : [], 'critical' : 'N' }, 'VOGet' : {'metricDescription': "Copy given remote file(s) from SRM to a local file.", 'cmdLineOptions' : ['se-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : [], 'critical' : 'N' }, 'VODel' : {'metricDescription': "Delete given file(s) from SRM.", 'cmdLineOptions' : ['se-timeout='], 'cmdLineOptionsReq' : [], 'metricChildren' : [], 'critical' : 'N' }, 'AllCMS' : {'metricDescription': "Run all CMS metrics.", 'cmdLineOptions' : [], 'cmdLineOptionsReq' : [], 'metricsOrder' : ['GetPFNFromTFC','VOLsDir','VOPut','VOLs','VOGetTURLs','VOGet','VODel'] }, } def __init__(self, tuples): probe.MetricGatherer.__init__(self, tuples, 'SRM') self.usage=""" Metrics specific options: --srmv <1|2> (Default: %s) %s --ldap-uri Format [ldap://]hostname[:port[/]] (Default: %s) --ldap-timeout (Default: %i) %s --se-timeout (Default: %i) !!! NOT IMPLEMENTED YET !!! --sapath Storage Area Path to be tested on SRM. Comma separated list of Storage Paths to be tested. """%(self.svcVer, self.ns+'.SRM-{GetSURLs,GetTURLs}', self._ldap_url, self._timeouts['ldap_timelimit'], self.ns+'.SRM-{LsDir,Put,Ls,GetTURLs,Get,Del}', self._timeouts['srm_connect']) # TODO: move to super class # Need to be parametrized from CLI at runtime self.childTimeout = 120 # timeout # initiate metrics description self.set_metrics(self._metrics) # parse command line parameters self.parse_cmd_args(tuples) # working directory for metrics self.make_workdir() # LDAP self._ldap_fileEndptSAPath = self.workdir_metric+"/EndpointAndPath" # files and patterns self._fileTest = self.workdir_metric+'/testFile.txt' self._fileTestIn = self.workdir_metric+'/testFileIn.txt' self._fileSRMPattern = 'testfile-put-%s-%s-%s.txt' # spacetoken, time, uuid # Dictionary of extra SRM info for VOs, and file to save current version and history of dictionary curhour=datetime.datetime.now().hour self._fileHistoryVoInfoDictionary = self.workdir_metric+"/VOInfoDictionary_%s"%curhour self._fileVoInfoDictionary = self.workdir_metric+"/VOInfoDictionary" #Read dictionary from current cache try: #Clean up stale current cache entries (older than 3 days) try: modtime=os.path.getmtime(self._fileVoInfoDictionary) if (time.time()-modtime>3*86400): self.printd('Stale VO Info cache file, deleting') os.remove(self._fileVoInfoDictionary) except OSError: self.printd('VO Info cache file not found') self._voInfoDictionary = self.readVoInfoDictionary(self._fileVoInfoDictionary) self.printd('Loading VO Info dictionary from cache') except IOError: self._voInfoDictionary = {} self.printd('No cached VO Info dictionary; creating empty dictionary') except KeyError: self._voInfoDictionary = {} os.remove(self._fileVoInfoDictionary) self.printd('Cannot read cached VO Info dictionary; cleaning cache file and creating empty dictionary') # GFAL version self.gfal2_ver = "gfal2 " + gfal2.get_version() # lock file self._fileLock = self.workdir_metric+'/lock' self._fileLock_timelimit = 5*60 'timelimit on working directory lock' def parse_args(self, opts): for o,v in opts: if o in ('--srmv'): if v in self.svcVers: self.svcVer = str(v) else: errstr = '--srmv must be one of '+\ ', '.join([x for x in self.svcVers])+'. '+v+' given.' raise getopt.GetoptError(errstr) elif o in ('--ldap-uri'): [host, port] = samutils.parse_uri(v) if port == None or port == '': port = '2170' self._ldap_url = 'ldap://'+host+':'+port os.environ['LCG_GFAL_INFOSYS'] = host+':'+port elif o in ('--ldap-timeout'): self._timeouts['ldap_timelimit'] = int(v) elif o in ('--se-timeout'): self._timeouts['srm_connect'] = int(v) def __workdir_islocked(self): """Check if working directory is locked within allowed timelimit. """ if not os.path.exists(self._fileLock): return False else: delta = time.time() - os.stat(self._fileLock).st_ctime if delta >= self._fileLock_timelimit: os.unlink(self._fileLock) return False else: return True def __workdir_lock(self): """Lock working directory. """ if self.__workdir_islocked(): raise IOError('Working directory is locked: %s' % self.workdir_metric) file(self._fileLock, 'w') def __workdir_unlock(self): """Unlock working directory. """ try: os.unlink(self._fileLock) except OSError: pass def saveVoInfoDictionary(self,filename): fp = open(filename, "w") pickle.dump(self._voInfoDictionary,fp) fp.close() def readVoInfoDictionary(self,filename): fp = open(filename, "r") voInfoDict=pickle.load(fp) fp.close() return voInfoDict def weightEndpointCriticality(self,VOtest): DetailedMsg='' CriticalResult=[] for srmendpt in self._voInfoDictionary.keys(): try: try: criticality=self._voInfoDictionary[srmendpt]['criticality'] except KeyError: criticality=1 if criticality==1: CriticalResult.append(self._voInfoDictionary[srmendpt][VOtest][0]) #DetailedMsg = DetailedMsg + str(self._voInfoDictionary[srmendpt]) DetailedMsg = DetailedMsg + \ str(self._voInfoDictionary[srmendpt]['space_token']) +\ " critical= "+ str(criticality) +\ " "+ str(self._voInfoDictionary[srmendpt][VOtest][1]) +\ " file= " + str(self._voInfoDictionary[srmendpt]['fn'])+\ "\n" #self.printd('VO specific Detailed Output: %s' % str(DetailedMsg)) except IndexError: return ('UNKNOWN', 'No SRM endpoints found in internal dictionary') except KeyError: return ('UNKNOWN', 'No test results found in internal dictionary for SRM endpoint') #print " GLOBAL result \n \n \n \n \n " ## oredering criticality self.printd('VO specific Detailed Output: %s' % str(DetailedMsg)) if 'CRITICAL' in CriticalResult: # it's enough one CRIT #return ('CRITICAL' ,str(DetailedMsg)) return ('CRITICAL') if 'WARNING' in CriticalResult: # #return ('WARNING' ,str(DetailedMsg)) return ('WARNING') if 'UNKNOWN' in CriticalResult: # #return ('UNKNOWN' ,str(DetailedMsg)) return ('UNKNOWN') #return ('OK' ,str(DetailedMsg)) # all OK return ('OK') def metricAllCMS(self): return self.metricAll('AllCMS') def metricAllATLAS(self): return self.metricAll('AllATLAS') def metricAllLHCb(self): return self.metricAll('AllLHCb') def metricGetPFNFromTFC(self,testLFN="/store/unmerged/SAM/testSRM"): """Get full SRM endpoint(s) and storage areas from PhEDEx DataService. """ try: self.__workdir_lock() except IOError, e: self.printd('Failed to lock. %s' % str(e)) return 'UNKNOWN', 'UNKNOWN: Failed to lock working directory.' #URLs for PhEDEx DataService for lfn2pfn tfcURL="https://cmsweb.cern.ch/phedex/datasvc/json/prod/lfn2pfn?node=" pfnMatchURL="&lfn=" pfnProtocolOption = "&protocol=srmv2" destinationOption = "&destination=" custodialOption = "&custodial=" #URLs for PhEDEx DataService for senames API seNamesURL="https://cmsweb.cern.ch/phedex/datasvc/json/prod/senames?protocol=srmv2&sename=%s" nodeName = self.hostName opener=urllib2.build_opener() header='grid-monitoring-probes-org.cms.SRM-GetPFNFromTFC/1.0 (CMS) %s/%s %s/%s (%s)' % (urllib2.__name__,urllib2.__version__,platform.system(),platform.release(),platform.processor()) opener.addheaders = [('User-agent', header)] # LFN path for file to test transfers self.printd('The LFN used for testing will be in: '+testLFN) try: self.printd("Contacting PhEDEx dataservice to perform SEName-to-PhEDExNodeName at URL:") self.printd(seNamesURL % nodeName) seNames=opener.open(seNamesURL % nodeName) seNamesJSON = simplejson.load(seNames) phedexNodeNames = seNamesJSON[u'phedex'][u'senames'] except (urllib2.URLError, KeyError): self.printd('WARNING: Unable to open PhEDEx DataService senames API to perform SEName-to-PhEDExNodeName matching for SEName %s' % nodeName) if len(self._voInfoDictionary): self.printd("WARNING: using cached PFN") # Update timestamp/uuid in cached PFN for pfn in self._voInfoDictionary: try: self._voInfoDictionary[pfn]['fn'] = self._voInfoDictionary[pfn]['fntemp'] % (str(int(time.time())), samutils.uuidstr()) self.printd("The PFN path used for testing will be:") self.printd(pfn+" : "+str(self._voInfoDictionary[pfn])) except KeyError: self.printd("WARNING: no cached PFN found") return('WARNING',"WARNING: Unable to open PhEDEx DataService senames API, no cached PFN found") return('OK',"WARNING: Unable to open PhEDEx DataService senames API, using cached PFN") else: self.printd("WARNING: no cached PFN found") return('WARNING',"WARNING: Unable to open PhEDEx DataService senames API, no cached PFN found") outputList={} for pNNLine in phedexNodeNames: endpointName=pNNLine[u'sename'] try: siteName=pNNLine[u'node'] except KeyError: continue if endpointName == nodeName: self.printd(nodeName+" is listed as SRM for Site "+siteName) # Contact web service to get PFN and spacetoken for non-custodial transfers endpointLFN = "/SAM-%s/lcg-util" % endpointName # Testing only non-custodial area - don't want to clutter custodial area with small files at T1s custodiality = "n" pfnUrl = tfcURL+siteName+pfnMatchURL+testLFN+endpointLFN+pfnProtocolOption+destinationOption+siteName+custodialOption+custodiality self.printd("Setting custodiality flag="+custodiality) self.printd("Contacting webservice to perform LFN-to-PFN matching at URL:") self.printd(pfnUrl) try: pfnFile=opener.open(pfnUrl) pfnJSON = simplejson.load(pfnFile) pfn = (((pfnJSON[u'phedex'])[u'mapping'])[0])[u'pfn'] spacetoken = (((pfnJSON[u'phedex'])[u'mapping'])[0])[u'space_token'] except (urllib2.URLError,KeyError): self.printd('WARNING: Unable to open PhEDEx DataService lfn2pfn URL to perform LFN-to-PFN matching for Site %s' % siteName) continue if pfn == None: self.printd("ERROR: LFN did not match to any PFN - probably the TFC does not contain any rule for the srmv2 protocol.") continue self.printd("LFN was matched to PFN "+pfn) if spacetoken: spacetokendesc=spacetoken self.printd("In space token "+spacetoken+" for custodiality="+custodiality) else: spacetokendesc="nospacetoken" self.printd("No space token defined for custodiality="+custodiality) if re.compile("^srm://.+srm/managerv2\?SFN=.+$").match(pfn) or re.compile("^srm://.+srm/v2/server\?SFN=.+$").match(pfn): pfntonode=re.sub(":.+$","",re.sub("^srm://","",pfn)) if pfntonode!=nodeName : self.printd("WARNING: the resulting PFN matches to SRM "+pfntonode+" instead of SRM "+nodeName) continue else: fntemp = self._fileSRMPattern % (spacetokendesc,'%s','%s') fn = fntemp % (str(int(time.time())), samutils.uuidstr()) outputList[pfn]={'fntemp' : fntemp, 'fn': fn, 'space_token': spacetoken, 'space_token_get': spacetoken, 'userspace' : testLFN} elif pfn.startswith("gsiftp://"): pfntonode=urlparse.urlparse(pfn).hostname fntemp = self._fileSRMPattern % (spacetokendesc,'%s','%s') fn = fntemp % (str(int(time.time())), samutils.uuidstr()) outputList[pfn]={'fntemp' : fntemp, 'fn': fn, 'space_token': spacetoken, 'space_token_get': spacetoken, 'userspace' : testLFN} else: self.printd("WARNING: Invalid matching to srmv2 protocol") self.printd("Note: this test currently supports only PFNs in the known srmv2 full endpoint formats:") self.printd("srm://hostname:port/srm/managerv2?SFN=sitefilename") self.printd("or") self.printd("srm://hostname:port/srm/v2/server?SFN=sitefilename") self.printd("or") self.printd("gsiftp://hostname:port/sitefilename") # Extract a random PFN from the dictionary of PFN matches. It will be used for testing, other PFN matches will be ignored # Print warning if not all PFN matches are the same. if len(outputList)==0: self.printd("WARNING: "+nodeName+" not found in SRM list") self.printd("WARNING: This error usually means that the site is not running PhEDEx agents in the Prod instance,") self.printd("WARNING: or that the TrivialFileCatalog published by the site's PhEDEx agents doesn't have a valid srmv2 protocol rule for "+nodeName) if len(self._voInfoDictionary): self.printd("WARNING: using cached PFN") # Update timestamp/uuid in cached PFN for pfn in self._voInfoDictionary: try: self._voInfoDictionary[pfn]['fn'] = self._voInfoDictionary[pfn]['fntemp'] % (str(int(time.time())), samutils.uuidstr()) self.printd("The PFN path used for testing will be:") self.printd(pfn+" : "+str(self._voInfoDictionary[pfn])) except KeyError: self.printd("WARNING: no cached PFN found") return('WARNING',"WARNING: "+nodeName+" not found in SRM list, no cached PFN found") return('OK',"WARNING: "+nodeName+" not found in SRM list, using cached PFN") else: self.printd("WARNING: no cached PFN found") return('WARNING',"WARNING: "+nodeName+" not found in SRM list, no cached PFN found") else: self._voInfoDictionary=outputList for outputPfns in outputList: self.printd("The PFN path used for testing will be:") self.printd(outputPfns+" : "+str(outputList[outputPfns])) self.printd('Saving endpoints to %s' % self._fileVoInfoDictionary, v=2) self.printd('Test results will be saved to %s' % self._fileHistoryVoInfoDictionary, v=2) try: self.saveVoInfoDictionary(self._fileVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary to file %s' % self._fileVoInfoDictionary) try: self.saveVoInfoDictionary(self._fileHistoryVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary history to file %s' % self._fileHistoryVoInfoDictionary) return ('OK', "Got PFN and Space Token from PhEDEx DataService") def metricGetATLASInfo(self): """Get full SRM endpoint(s) and storage areas from ToACache. """ agis_file="/afs/cern.ch/user/d/digirola/public/nagios_atlas/project/src/SRM/org.atlas/src/ToA_srm2_list" agis=open(agis_file, 'r') agis_endpoint_info=[] for entry in agis: if entry.find(self.hostName) != -1: #print entry agis_endpoint_info.append(entry[:-1]) spacetokendesc=entry.split()[1] fn = self._fileSRMPattern % (spacetokendesc,str(int(time.time())), samutils.uuidstr()) if spacetokendesc in ('ATLASDATADISK','ATLASMCDISK','ATLASGROUPDISK'): criticality=1 else: criticality=0 #endpoint (spacetoken) criticality agis_endpoint_details = { 'fn': fn, 'space_token': spacetokendesc, 'criticality': criticality, } self._voInfoDictionary[entry.split()[0]+'SAM']=agis_endpoint_details #print agis_endpoint_info self.printd(str(agis_endpoint_info)) self.printd(str(self._voInfoDictionary)) try: fp = open(self._ldap_fileEndptSAPath, "w") for info in agis_endpoint_info: ep=info.split()[0]+'\n' fp.write(ep) fp.close() except IOError, e: try: os.unlink(self._ldap_fileEndptSAPath) except OSError: pass return ('UNKNOWN', 'IOError: %s' % str(e)) #print self._ldap_fileEndptSAPath return ('OK',"Endpoint informations found in ToA cached file ") def metricGetLHCbInfo(self): """Get full SRM endpoint(s) and storage areas from ToACache. """ dirac_file="/afs/cern.ch/user/s/santinel/public/www/ATP/ToA_srm2_list" dirac=open(dirac_file, 'r') dirac_endpoint_info=[] for entry in dirac: if entry.find(self.hostName) != -1: #print entry dirac_endpoint_info.append(entry[:-1]) spacetokendesc=entry.split()[1] fn = self._fileSRMPattern % (spacetokendesc,str(int(time.time())), samutils.uuidstr()) if spacetokendesc in ('LHCb_USER','LHCb_M-DST','LHCb_RAW'): criticality=1 else: criticality=0 #endpoint (spacetoken) criticality dirac_endpoint_details = { 'fn': fn, 'space_token': spacetokendesc, 'criticality': criticality, } self._voInfoDictionary[entry.split()[0]+'/SAM']=dirac_endpoint_details #print dirac_endpoint_info self.printd(str(dirac_endpoint_info)) self.printd(str(self._voInfoDictionary)) try: fp = open(self._ldap_fileEndptSAPath, "w") for info in dirac_endpoint_info: ep=info.split()[0]+'\n' fp.write(ep) fp.close() except IOError, e: try: os.unlink(self._ldap_fileEndptSAPath) except OSError: pass return ('UNKNOWN', 'IOError: %s' % str(e)) #print self._ldap_fileEndptSAPath return ('OK',"Endpoint informations found in ToA cached file ") def metricVOLsDir(self): "List content of VO's top level space area(s) in SRM using gfal2.listdir()." status = 'OK' summary = '' self.printd(self.gfal2_ver) srms = self._voInfoDictionary.keys() req = {'surls' : srms, 'timeout' : self._timeouts['srm_connect'], } self.printd('Using gfal2 listdir().') self.printd('Parameters:\n%s' % '\n'.join( [' %s: %s' % (x,str(y)) for x,y in req.items()])) # Instantiate gfal2 ctx = gfal2.creat_context() self.print_time() self.printd('Listing storage url(s).') for surl in req['surls']: summary += 'Storage Path[%s]' % surl self.printd('Storage Path[%s]' % surl, cr=False) try: self.print_time() # FIXME: Set timeout? ctx.listdir(surl) summary += '-ok;' self.printd('-ok;') except gfal2.GError, e: em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: if status != 'CRITICAL': status = er[0][2] summary += '-%s [ErrDB:%s];' % (status.lower(), str(er)) else: status = 'CRITICAL' summary += '-%s;' % status.lower() self.printd('-%s;\nERROR: %s\n' % (status.lower(), e.message)) except Exception, e: return ('UNKNOWN', 'problem invoking gfal2 listdir(): %s:%s' % (str(e), sys.exc_info()[0])) return (status, summary) def metricVOPut(self): "Copy a local file to the SRM into space area(s) defined by VO." def event_callback(event): self.printd("[%s] %s %s %s" % (event.timestamp, event.domain, event.stage, event.description)) self.printd(self.gfal2_ver) # generate source file try: src_filename = self._fileTest fp = open(src_filename, "w") for s in "1234567890": fp.write(s+'\n') fp.close() src_file = 'file://' + src_filename except IOError, e: self.printd('ERROR: %s' % str(e)) return ('UNKNOWN', 'Error opening local file.') # Instantiate gfal2 ctx = gfal2.creat_context() for srmendpt in self._voInfoDictionary.keys(): # Set transfer parameters params = ctx.transfer_parameters() params.create_parent = True params.timeout = self._timeouts['srm_connect'] params.event_callback = event_callback if self._voInfoDictionary[srmendpt].get('space_token', None): params.dst_spacetoken = self._voInfoDictionary[srmendpt]['space_token'] dest_filename=(self._voInfoDictionary[srmendpt])['fn'] dest_file=srmendpt+'/'+dest_filename self.printd('VOPut: Copy file using gfal.filecopy().') self.printd('''Parameters: source: %s dest: %s src_spacetoken: %s dst_spacetoken: %s timeout: %s''' % (src_file, dest_file, params.src_spacetoken, params.dst_spacetoken, params.timeout)) start_transfer = datetime.datetime.now() self.printd('StartTime of the transfer: %s' % str(start_transfer)) stMsg = 'File was%s copied to SRM.' try: ctx.filecopy(params, src_file, dest_file) status = 'OK' total_transfer = datetime.datetime.now()-start_transfer self.printd('Transfer Duration: %s' % str(total_transfer)) summary = stMsg % ''+ " Transfer time: "+str(total_transfer) except gfal2.GError, e: em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: status = er[0][2] summary = stMsg % (' NOT')+' [ErrDB:%s]' % str(er) else: status = 'CRITICAL' summary = stMsg % ' NOT' self.printd('ERROR: %s' % str(e)) except Exception, e: status = 'UNKNOWN' summary = stMsg % ' NOT' self.printd('ERROR: %s:%s' % (str(e), sys.exc_info()[0])) (self._voInfoDictionary[srmendpt])['putResult']=(status,summary) try: self.saveVoInfoDictionary(self._fileHistoryVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary to file %s' % self._fileHistoryVoInfoDictionary) # weightedResult will return a tuple with nagiosexitcode and detailed output weightedResult=self.weightEndpointCriticality('putResult') return weightedResult ## what if no srmendpt? def metricVOLs(self): "Stat (previously copied) file(s) on the SRM." self.printd(self.gfal2_ver) status = 'OK' srms = [] for srmendpt in self._voInfoDictionary.keys(): dest_filename=(self._voInfoDictionary[srmendpt])['fn'] dest_file=srmendpt+'/'+dest_filename srms.append(dest_file) self.print_time() req = {'surls' : srms, 'timeout' : self._timeouts['srm_connect'], } self.printd('Using gfal2.stat().') self.printd('Parameters:\n%s' % '\n'.join( [' %s: %s' % (x,str(y)) for x,y in req.items()])) self.print_time() self.printd('Stating file(s).') errmsg = '' # Instantiate gfal2 ctx = gfal2.creat_context() summary = '' for surl in req['surls']: # summary += 'listing [%s]' % surl self.printd('listing [%s]' % surl) try: # FIXME: Set timeout? statp = ctx.stat(surl) self.printd("stat: " + str(statp).replace('\n', ', ')) summary += '-ok;' self.printd('-ok;') except gfal2.GError, e: em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: if status != 'CRITICAL': status = er[0][2] summary += '-%s [ErrDB:%s];' % (status.lower(), str(er)) else: status = 'CRITICAL' summary += '-%s;' % status.lower() self.printd('-%s;\nERROR: %s\n' % (status.lower(), e.message)) except Exception, e: return ('UNKNOWN', 'problem invoking gfal2 stat(): %s:%s' % (str(e), sys.exc_info()[0])) return (status, summary) def metricVOGetTURLs(self): "Get Transport URLs for the file copied to storage." self.printd(self.gfal2_ver) # Instantiate gfal2 ctx = gfal2.creat_context() for srmendpt in self._voInfoDictionary.keys(): self.print_time() src_filename=(self._voInfoDictionary[srmendpt])['fn'] src_file=srmendpt+'/'+src_filename status = 'OK' summary = '' self.printd('=====\nSURL: %s\n-----' % src_file) self.print_time() protocol = 'gsiftp' try: if urlparse.urlparse(src_file).scheme == 'gsiftp': # If protocol is gsiftp, it's already a transport URL replicas = src_file else: # FIXME: Set timeout? replicas = ctx.getxattr(src_file, 'user.replicas') self.printd('proto: %s - OK' % protocol) self.printd('replicas: %s' % replicas) summary = 'protocol OK-[%s]' % protocol except gfal2.GError, e: self.printd('proto: %s - FAILED' % protocol) self.printd('error: %s' % e.message) summary = 'protocol FAILED-[%s]' % protocol em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: status = er[0][2] else: status = 'CRITICAL' except Exception, e: status = 'UNKNOWN' self.printd('ERROR: %s\n%s' % (str(e), sys.exc_info()[0])) self.print_time() self.printd('=====') (self._voInfoDictionary[srmendpt])['getTURLResult']=(status,summary) try: self.saveVoInfoDictionary(self._fileHistoryVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary to file %s' % self._fileHistoryVoInfoDictionary) #EXTRACT ARIBITRARY ITEM FROM THE DICTIONARY TO RETURN RESULTS #REPLACE WITH WEIGHTED CALCULATION BASED ON CRITICALITY OF PATHS/ENDPOINTS!!! try: return (self._voInfoDictionary.values()[0])['getTURLResult'] except IndexError: return ('UNKNOWN', 'No SRM endpoints found in internal dictionary') except KeyError: return ('UNKNOWN', 'No test results found in internal dictionary for SRM endpoint') def metricVOGet(self): "Copy given remote file(s) from SRM to a local file." def event_callback(event): self.printd("[%s] %s %s %s" % (event.timestamp, event.domain, event.stage, event.description)) self.printd(self.gfal2_ver) # Instantiate gfal2 ctx = gfal2.creat_context() for srmendpt in self._voInfoDictionary.keys(): self.print_time() src_filename=(self._voInfoDictionary[srmendpt])['fn'] src_file=srmendpt+'/'+src_filename dest_file = 'file://'+self._fileTestIn self.printd('Source: %s' % src_file) self.printd('Destination: %s' % dest_file) # Set transfer parameters params = ctx.transfer_parameters() params.timeout = self._timeouts['srm_connect'] params.event_callback = event_callback if self._voInfoDictionary[srmendpt].get('space_token_get', None): params.src_spacetoken = self._voInfoDictionary[srmendpt]['space_token_get'] params.overwrite = True self.printd('Get file using gfal.filecopy().') self.printd('''Parameters: source: %s dest: %s src_spacetoken: %s dst_spacetoken: %s timeout: %s''' % (src_file, dest_file, params.src_spacetoken, params.dst_spacetoken, params.timeout)) stMsg = 'File was%s copied from SRM.' errmsg = '' start_transfer = datetime.datetime.now() self.printd('StartTime of the transfer: %s' % str(start_transfer)) try: ctx.filecopy(params, src_file, dest_file) if filecmp.cmp(self._fileTest, self._fileTestIn): # Files match status = 'OK' total_transfer = datetime.datetime.now()-start_transfer self.printd('Transfer Duration: %s' % str(total_transfer)) summary = stMsg % ('')+' Diff successful.' + " Transfer time: "+str(total_transfer) else: # Files do not match status = 'CRITICAL' summary = stMsg % ('')+' Files differ!' self.printd('Files differ!') except gfal2.GError, e: em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: status = er[0][2] summary = stMsg % (' NOT')+' [ErrDB:%s]' % str(er) else: status = 'CRITICAL' summary = stMsg % ' NOT' self.printd('ERROR: %s' % str(e)) except Exception, e: status = 'UNKNOWN' summary = stMsg % ' NOT' self.printd('ERROR: %s:%s' % (str(e), sys.exc_info()[0])) self.print_time() (self._voInfoDictionary[srmendpt])['getResult']=(status,summary) try: self.saveVoInfoDictionary(self._fileHistoryVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary to file %s' % self._fileHistoryVoInfoDictionary) # GET WEIGHTED CALCULATION BASED ON CRITICALITY OF PATHS/ENDPOINTS weightedResult=self.weightEndpointCriticality('getResult') return weightedResult def metricVODel(self): "Delete given file(s) from SRM." self.printd(self.gfal2_ver) # TODO: - cleanup of the metric's working directory # (this may go to metricAll() in the superclass) # Instantiate gfal2 ctx = gfal2.creat_context() for srmendpt in self._voInfoDictionary.keys(): self.print_time() src_filename=(self._voInfoDictionary[srmendpt])['fn'] src_file=srmendpt+'/'+src_filename self.printd('Source: %s' % src_file) self.printd('Using gfal2.unlink().') stMsg = 'File was%s deleted from SRM.' errmsg = '' self.print_time() self.printd('Deleting: %s' % src_file) try: # FIXME: Set timeout? ctx.unlink(src_file) status = 'OK' summary = stMsg % '' except gfal2.GError, e: summary = stMsg % ' NOT' em = probe.ErrorsMatching(self.errorDBFile, self.errorTopics) er = em.match(e.message) if er: status = er[0][2] else: status = 'CRITICAL' self.printd('ERROR: %s:%s' % (str(e),sys.exc_info()[0])) except Exception, e: status = 'UNKNOWN' summary = stMsg % ' NOT' self.printd('ERROR: %s:%s' % (str(e),sys.exc_info()[0])) self.print_time() (self._voInfoDictionary[srmendpt])['delResult']=(status,summary) try: self.saveVoInfoDictionary(self._fileHistoryVoInfoDictionary) except IOError: self.printd('Error saving VO Info Dictionary to file %s' % self._fileHistoryVoInfoDictionary) #Remove lock on workdir self.__workdir_unlock() #EXTRACT ARIBITRARY ITEM FROM THE DICTIONARY TO RETURN RESULTS #REPLACE WITH WEIGHTED CALCULATION BASED ON CRITICALITY OF PATHS/ENDPOINTS!!! try: return (self._voInfoDictionary.values()[0])['delResult'] except IndexError: return ('UNKNOWN', 'No SRM endpoints found in internal dictionary') except KeyError: return ('UNKNOWN', 'No test results found in internal dictionary for SRM endpoint') runner = probe.Runner(SRMVOMetrics, probe.ProbeFormatRenderer()) sys.exit(runner.run(sys.argv))