diff --git a/Configs/configparsertest_modifiedvalues.cfg b/Configs/configparsertest_modifiedvalues.cfg index c3b0b019151ef4705d1f6953aef64dce8c5a30a1..b01eafa270d2c41a79103327d92832f3832bfe1c 100644 --- a/Configs/configparsertest_modifiedvalues.cfg +++ b/Configs/configparsertest_modifiedvalues.cfg @@ -32,8 +32,10 @@ ISEnabled: True ISPartition: 'ispartition' ISServer: 'isserver' ISPublicationPeriod: 1 -ISFileSystemInfoEnabled: True ISWaitAfterParitionValid: 1 +ISCpuStatsEnabled: True +ISFileSystemInfoEnabled: True +ISStorageIoStatsDevices: ['abc', 'def'] DBURL: 'basededonnees' DBFileTable: 'filetable' DBLBTable: 'lbtable' diff --git a/Script/cs/Threads/InfoServiceThread.py b/Script/cs/Threads/InfoServiceThread.py index c1d574cac31d37f12bb6f9b1a7eba27bf43850c0..2a81687da77b5e54461e8f48c7ec8aa00822a6a8 100644 --- a/Script/cs/Threads/InfoServiceThread.py +++ b/Script/cs/Threads/InfoServiceThread.py @@ -1,24 +1,24 @@ #!/usr/bin/env tdaq_python import threading -import os +import psutil from datetime import datetime import ispy from cs.Tools.utils import thread_id_string from cs.Tools.utils import getmountpoint -from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name, make_fs_data_identifier, make_is_data_identifier +from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name, make_is_data_identifier # -------------------------------- -# The XML schema associated with CastorScript's IS classes, defining -# CastorScriptState and FileSystemInfo, is part of the SFOng package, because -# the schema needs to be deployed in to the TDAQ release and CastorScript is not -# a TDAQ project. +# The XML schema associated with CastorScript's IS classes, is part of the SFOng +# package, because the schema needs to be deployed in to the TDAQ release and +# CastorScript is not a TDAQ project: +# https://gitlab.cern.ch/atlas-tdaq-software/SFOng/-/blob/master/schema/CastorScript.IS.schema.xml # -------------------------------- class InfoServiceThread(threading.Thread): def __init__(self, conf, exitevent, copy_thread, delete_thread, manager_thread): - threading.Thread.__init__(self, name="InfoServiceThread") + threading.Thread.__init__(self, name="ISThread") self.exitevent = exitevent self.conf = conf self.time_created = datetime.utcnow() @@ -31,11 +31,13 @@ class InfoServiceThread(threading.Thread): self.ipc_partition = ispy.IPCPartition(conf.ISPartition) #pylint: disable=no-member self.partition_connected = True - # IS Server to publish to - # Errors comes up with: IS repository, not server - self.is_server = conf.ISServer - self.tdaq_app_name = make_tdaq_app_name(conf) + + self.cpustats_is_id = f'{self.conf.ISServer}.{self.tdaq_app_name}.cpustats' + self.update_cpu_stats_first = True + + self.iostatscache = {} + self.logger = enable_file_logging("isthread", "isthread.log", conf) @@ -45,7 +47,7 @@ class InfoServiceThread(threading.Thread): if self.ipc_partition.isValid(): self.logger.info('partition %s is valid: IS publication enabled', self.ipc_partition.name()) - + # variable to handle the period between the partition becomes valid # and the first publication attempt # None means that there is nothing to do @@ -59,7 +61,8 @@ class InfoServiceThread(threading.Thread): # warning instead of info when publication is restored to be # able to keep track of periods where publication was disabled self.logger.warning('partition %s is now valid: IS publications enabled' - ' (in %d seconds)', self.ipc_partition.name(), self.conf.ISWaitAfterParitionValid) + ' (in %d seconds)', self.ipc_partition.name(), + self.conf.ISWaitAfterParitionValid) partition_valid_at = datetime.utcnow() elif self.partition_connected and not self.ipc_partition.isValid(): self.partition_connected = False @@ -89,37 +92,16 @@ class InfoServiceThread(threading.Thread): self.update_cs_info() if self.conf.ISFileSystemInfoEnabled: self.update_fs_info() - - def update_fs_info(self): - fs_data_type = "FileSystemInfo" - - for srcdirs_item in self.conf.SrcDirs: - mountpoint = getmountpoint(srcdirs_item) - try: - fs = os.statvfs(mountpoint) - except OSError as e: - self.logger.error('Cannot stat %s: %s', e.filename, e.strerror) - continue - - fs_data_identifier = make_fs_data_identifier(self.is_server, self.tdaq_app_name, mountpoint) - - try: - is_data = ispy.ISObject(self.ipc_partition, fs_data_identifier, fs_data_type) - - is_data.total_MB = (fs.f_blocks * fs.f_bsize) // 1000000 - is_data.available_MB = (fs.f_bavail * fs.f_bsize) // 1000000 - is_data.occupancy_percent = 100.*(1-float(fs.f_bavail)/float(fs.f_blocks)) - - is_data.checkin() - self.logger.debug("FileSystemInfo sent: %s", str(is_data)) - except Exception as ex: - self.logger.error("Error occured in update_fs_info(): %s", str(ex)) - + if self.conf.ISCpuStatsEnabled: + self.update_cpu_stats() + if self.conf.ISStorageIoStatsDevices: + self.update_storage_io_stats() def update_cs_info(self): is_data_type = "CastorScriptState" - is_data_identifier = make_is_data_identifier(self.is_server, self.tdaq_app_name) + is_data_identifier = make_is_data_identifier(self.conf.ISServer, + self.tdaq_app_name) try: is_data = ispy.ISObject(self.ipc_partition, is_data_identifier, is_data_type) @@ -158,3 +140,132 @@ class InfoServiceThread(threading.Thread): return 0 else: return (datetime.utcnow() - self.time_created).total_seconds() + + + def update_fs_info(self): + for srcdirs_item in self.conf.SrcDirs: + mountpoint = getmountpoint(srcdirs_item) + try: + du = psutil.disk_usage(mountpoint) + except Exception as e: + self.logger.error('error getting file system usage: %s', e) + continue + + fsinfo_id = "{}.{}.fsinfo-{}".format(self.conf.ISServer, + self.tdaq_app_name, mountpoint) + + try: + is_data = ispy.ISObject(self.ipc_partition, fsinfo_id, + "FileSystemInfo") + + is_data.total_MB = du.total // 1000000 + is_data.available_MB = du.free // 1000000 + is_data.occupancy_percent = du.percent + + is_data.checkin() + self.logger.debug("fsinfo published: %s", str(is_data)) + except Exception as ex: + self.logger.error("error publishing fsinfo: %s", str(ex)) + + + def update_cpu_stats(self): + # get utilisation fraction since last call + c = psutil.cpu_times_percent() + + if self.update_cpu_stats_first: + # documentation says first value is to be ignored + self.update_cpu_stats_first = False + return + + try: + is_data = ispy.ISObject(self.ipc_partition, + self.cpustats_is_id, "CpuStats") + + is_data.user = c.user + is_data.nice = c.nice + is_data.system = c.system + is_data.idle = c.idle + is_data.iowait = c.iowait + is_data.hwirq = c.irq + is_data.swirq = c.softirq + + is_data.checkin() + self.logger.debug("cpustats published: %s", str(is_data)) + except Exception as ex: + self.logger.error("error publishing cpustats: %s", str(ex)) + + def update_storage_io_stats(self): + now = datetime.now() + + if not self.iostatscache: + # cache initialization + d = psutil.disk_io_counters(perdisk=True, nowrap=True) + + for sd in self.conf.ISStorageIoStatsDevices: + if sd not in d: + self.logger.warning('no stats for device %s', sd) + continue + self.iostatscache[sd] = StorageDeviceIoStats(sd) + stats = d[sd] + self.iostatscache[sd].update(now, stats.read_count, + stats.write_count, stats.read_bytes, stats.write_bytes, + stats.busy_time) + return + + d = psutil.disk_io_counters(perdisk=True, nowrap=True) + for sd in d: + if not sd in self.conf.ISStorageIoStatsDevices: + continue + stats = d[sd] + (rr, wr, rt, wt, up) = self.iostatscache[sd].update(now, + stats.read_count, stats.write_count, stats.read_bytes, + stats.write_bytes, stats.busy_time) + + obj_id = "{}.{}.iostats-{}".format(self.conf.ISServer, + self.tdaq_app_name, sd) + + try: + is_data = ispy.ISObject(self.ipc_partition, obj_id, "StorageIoStats") + + is_data.read_req_rate = rr + is_data.write_req_rate = wr + is_data.read_tput = rt + is_data.write_tput = wt + is_data.util = up + + is_data.checkin() + self.logger.debug("cpustats published: %s", str(is_data)) + except Exception as ex: + self.logger.error("error publishing cpustats: %s", str(ex)) + + +class StorageDeviceIoStats: + def __init__(self, name): + self.name = name + self.rd_rq = None + self.wr_rq = None + self.rd_by = None + self.wr_by = None + self.busy_time_ms = None + self.ts = None + + def update(self, ts, rd_rq, wr_rq, rd_by, wr_by, busy_time_ms): + if self.ts is None: + rrr = wrr = rtput = wtput = utilp = None + else: + dur = ts.timestamp() - self.ts + rrr = int((rd_rq - self.rd_rq) / dur) + wrr = int((wr_rq - self.wr_rq) / dur) + rtput = int((rd_by - self.rd_by) / dur) + wtput = int((wr_by - self.wr_by) / dur) + utilp = round((busy_time_ms - self.busy_time_ms) / (dur * 10), 2) + + + self.rd_rq = rd_rq + self.wr_rq = wr_rq + self.rd_by = rd_by + self.wr_by = wr_by + self.busy_time_ms = busy_time_ms + self.ts = ts.timestamp() + + return (rrr, wrr, rtput, wtput, utilp) \ No newline at end of file diff --git a/Script/cs/Tools/ConfigParser.py b/Script/cs/Tools/ConfigParser.py index dc280deb495c1c0f20dcd194b3614ed29694d86b..53371a4ba0aec74e1553a7ef1420809c29de86e1 100644 --- a/Script/cs/Tools/ConfigParser.py +++ b/Script/cs/Tools/ConfigParser.py @@ -148,6 +148,8 @@ class ConfigHolder(object): self.ISPublicationPeriod = 5 self.ISFileSystemInfoEnabled = False self.ISWaitAfterParitionValid = 60 + self.ISCpuStatsEnabled = False + self.ISStorageIoStatsDevices = [] try: self.ISEnabled = cfg.ISEnabled @@ -171,6 +173,12 @@ class ConfigHolder(object): # before trying to publish (otherwise unrecoverable errors) self.ISWaitAfterParitionValid = cfg.ISWaitAfterParitionValid except AttributeError: pass + try: # Publish global CPU stats + self.ISCpuStatsEnabled = cfg.ISCpuStatsEnabled + except AttributeError: pass + try: # Publish storage devices I/O stats for ISStorageIoStatsDevices + self.ISStorageIoStatsDevices = [d for d in cfg.ISStorageIoStatsDevices] + except AttributeError: pass # METADATA DATABASE diff --git a/Script/cs/Tools/LogConfig.py b/Script/cs/Tools/LogConfig.py index 13d6eae92c85d111bbbeea62a18af844f2fe5f71..cc7183ef1348267a1836205ef62e6783e893225d 100644 --- a/Script/cs/Tools/LogConfig.py +++ b/Script/cs/Tools/LogConfig.py @@ -6,9 +6,6 @@ import mailinglogger def make_is_data_identifier(is_server,tdaq_app_name): return "{}.{}.State".format(is_server,tdaq_app_name) -def make_fs_data_identifier(is_server,tdaq_app_name,fs_name): - return "{}.{}.fsinfo-{}".format(is_server,tdaq_app_name,fs_name) - def make_tdaq_app_name(config): hostname = socket.gethostname() cleanFileName = config.FileName.replace("Config", "",1).replace("Conf","",1).replace(".cfg","",1).replace("-","").replace("_","") diff --git a/UnitTests/ConfigParser_Test.py b/UnitTests/ConfigParser_Test.py index 38e818b1e1450444aade1757e58657e5e5df95a9..1a6e496bbe1bee8ef021dd3dd81898c18f57f408 100644 --- a/UnitTests/ConfigParser_Test.py +++ b/UnitTests/ConfigParser_Test.py @@ -49,6 +49,8 @@ class TestConfigHolder(unittest.TestCase): self.assert_equal_del('ISPublicationPeriod', 5) self.assert_equal_del('ISFileSystemInfoEnabled', False) self.assert_equal_del('ISWaitAfterParitionValid', 60) + self.assert_equal_del('ISCpuStatsEnabled', False) + self.assert_equal_del('ISStorageIoStatsDevices', []) self.assert_equal_del('DBURL', None) self.assert_equal_del('DBFileTable', '') self.assert_equal_del('DBLBTable', '') @@ -154,6 +156,8 @@ class TestConfigHolder(unittest.TestCase): self.assert_equal_del('ISPublicationPeriod', 1) self.assert_equal_del('ISFileSystemInfoEnabled', True) self.assert_equal_del('ISWaitAfterParitionValid', 1) + self.assert_equal_del('ISCpuStatsEnabled', True) + self.assert_equal_del('ISStorageIoStatsDevices', ['abc', 'def']) self.assert_equal_del('DBURL', 'basededonnees') self.assert_equal_del('DBFileTable', 'filetable') self.assert_equal_del('DBLBTable', 'lbtable') diff --git a/UnitTests/InfoServiceThread_Test.py b/UnitTests/InfoServiceThread_Test.py index 8b322c0a21c4c04e09bd5056ab6f75ae462fd467..78d8df2a3543ef11d7fee2ca8cd0403ce534643e 100644 --- a/UnitTests/InfoServiceThread_Test.py +++ b/UnitTests/InfoServiceThread_Test.py @@ -63,6 +63,8 @@ class TestInfoServiceThread(unittest.TestCase): ISServer="RunParams", ISPublicationPeriod="1", ISFileSystemInfoEnabled=True, + ISCpuStatsEnabled=True, + ISStorageIoStatsDevices=['abc'], ISWaitAfterParitionValid=60) ## we're not interested in any logging for this test