From 1a58b547e2143b790ccf4299dbee70018f829c35 Mon Sep 17 00:00:00 2001
From: Fabrice Le Goff <fabrice.le.goff@cern.ch>
Date: Mon, 24 Oct 2022 16:49:28 +0200
Subject: [PATCH] added: CPU and storage devices monitoring published to IS
 (configurable)

---
 Configs/configparsertest_modifiedvalues.cfg |   4 +-
 Script/cs/Threads/InfoServiceThread.py      | 191 ++++++++++++++++----
 Script/cs/Tools/ConfigParser.py             |   8 +
 Script/cs/Tools/LogConfig.py                |   3 -
 UnitTests/ConfigParser_Test.py              |   4 +
 UnitTests/InfoServiceThread_Test.py         |   2 +
 6 files changed, 168 insertions(+), 44 deletions(-)

diff --git a/Configs/configparsertest_modifiedvalues.cfg b/Configs/configparsertest_modifiedvalues.cfg
index c3b0b01..b01eafa 100644
--- a/Configs/configparsertest_modifiedvalues.cfg
+++ b/Configs/configparsertest_modifiedvalues.cfg
@@ -32,8 +32,10 @@ ISEnabled: True
 ISPartition: 'ispartition'
 ISServer: 'isserver'
 ISPublicationPeriod: 1
-ISFileSystemInfoEnabled: True
 ISWaitAfterParitionValid: 1
+ISCpuStatsEnabled: True
+ISFileSystemInfoEnabled: True
+ISStorageIoStatsDevices: ['abc', 'def']
 DBURL: 'basededonnees'
 DBFileTable: 'filetable'
 DBLBTable: 'lbtable'
diff --git a/Script/cs/Threads/InfoServiceThread.py b/Script/cs/Threads/InfoServiceThread.py
index c1d574c..2a81687 100644
--- a/Script/cs/Threads/InfoServiceThread.py
+++ b/Script/cs/Threads/InfoServiceThread.py
@@ -1,24 +1,24 @@
 #!/usr/bin/env tdaq_python
 import threading
-import os
+import psutil
 from datetime import datetime
 import ispy
 
 from cs.Tools.utils import thread_id_string
 from cs.Tools.utils import getmountpoint
-from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name, make_fs_data_identifier, make_is_data_identifier
+from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name, make_is_data_identifier
 
 # --------------------------------
-# The XML schema associated with CastorScript's IS classes, defining
-# CastorScriptState and FileSystemInfo, is part of the SFOng package, because
-# the schema needs to be deployed in to the TDAQ release and CastorScript is not
-# a TDAQ project.
+# The XML schema associated with CastorScript's IS classes, is part of the SFOng
+# package, because the schema needs to be deployed in to the TDAQ release and
+# CastorScript is not a TDAQ project:
+# https://gitlab.cern.ch/atlas-tdaq-software/SFOng/-/blob/master/schema/CastorScript.IS.schema.xml
 # --------------------------------
 
 class InfoServiceThread(threading.Thread):
 
     def __init__(self, conf, exitevent, copy_thread, delete_thread, manager_thread):
-        threading.Thread.__init__(self, name="InfoServiceThread")
+        threading.Thread.__init__(self, name="ISThread")
         self.exitevent = exitevent
         self.conf = conf
         self.time_created = datetime.utcnow()
@@ -31,11 +31,13 @@ class InfoServiceThread(threading.Thread):
         self.ipc_partition = ispy.IPCPartition(conf.ISPartition) #pylint: disable=no-member
         self.partition_connected = True
 
-        # IS Server to publish to
-        # Errors comes up with: IS repository, not server
-        self.is_server = conf.ISServer
-
         self.tdaq_app_name = make_tdaq_app_name(conf)
+
+        self.cpustats_is_id = f'{self.conf.ISServer}.{self.tdaq_app_name}.cpustats'
+        self.update_cpu_stats_first = True
+
+        self.iostatscache = {}
+
         self.logger = enable_file_logging("isthread", "isthread.log", conf)
 
 
@@ -45,7 +47,7 @@ class InfoServiceThread(threading.Thread):
         if self.ipc_partition.isValid():
             self.logger.info('partition %s is valid: IS publication enabled',
                     self.ipc_partition.name())
-
+            
         # variable to handle the period between the partition becomes valid
         # and the first publication attempt
         # None means that there is nothing to do
@@ -59,7 +61,8 @@ class InfoServiceThread(threading.Thread):
                 # warning instead of info when publication is restored to be
                 # able to keep track of periods where publication was disabled
                 self.logger.warning('partition %s is now valid: IS publications enabled'
-                        ' (in %d seconds)', self.ipc_partition.name(), self.conf.ISWaitAfterParitionValid)
+                        ' (in %d seconds)', self.ipc_partition.name(),
+                        self.conf.ISWaitAfterParitionValid)
                 partition_valid_at = datetime.utcnow()
             elif self.partition_connected and not self.ipc_partition.isValid():
                 self.partition_connected = False
@@ -89,37 +92,16 @@ class InfoServiceThread(threading.Thread):
         self.update_cs_info()
         if self.conf.ISFileSystemInfoEnabled:
             self.update_fs_info()
-
-    def update_fs_info(self):
-        fs_data_type = "FileSystemInfo"
-
-        for srcdirs_item in self.conf.SrcDirs:
-            mountpoint = getmountpoint(srcdirs_item)
-            try:
-                fs = os.statvfs(mountpoint)
-            except OSError as e:
-                self.logger.error('Cannot stat %s: %s', e.filename, e.strerror)
-                continue
-
-            fs_data_identifier = make_fs_data_identifier(self.is_server, self.tdaq_app_name, mountpoint)
-
-            try:
-                is_data = ispy.ISObject(self.ipc_partition, fs_data_identifier, fs_data_type)
-
-                is_data.total_MB = (fs.f_blocks * fs.f_bsize) // 1000000
-                is_data.available_MB = (fs.f_bavail * fs.f_bsize) // 1000000
-                is_data.occupancy_percent = 100.*(1-float(fs.f_bavail)/float(fs.f_blocks))
-
-                is_data.checkin()
-                self.logger.debug("FileSystemInfo sent: %s", str(is_data))
-            except Exception as ex:
-                self.logger.error("Error occured in update_fs_info(): %s", str(ex))
-
+        if self.conf.ISCpuStatsEnabled:
+            self.update_cpu_stats()
+        if self.conf.ISStorageIoStatsDevices:
+            self.update_storage_io_stats()
 
 
     def update_cs_info(self):
         is_data_type = "CastorScriptState"
-        is_data_identifier = make_is_data_identifier(self.is_server, self.tdaq_app_name)
+        is_data_identifier = make_is_data_identifier(self.conf.ISServer,
+                self.tdaq_app_name)
 
         try:
             is_data = ispy.ISObject(self.ipc_partition, is_data_identifier, is_data_type)
@@ -158,3 +140,132 @@ class InfoServiceThread(threading.Thread):
             return 0
         else:
             return (datetime.utcnow() - self.time_created).total_seconds()
+
+
+    def update_fs_info(self):
+        for srcdirs_item in self.conf.SrcDirs:
+            mountpoint = getmountpoint(srcdirs_item)
+            try:
+                du = psutil.disk_usage(mountpoint)
+            except Exception as e:
+                self.logger.error('error getting file system usage: %s', e)
+                continue
+
+            fsinfo_id = "{}.{}.fsinfo-{}".format(self.conf.ISServer,
+                    self.tdaq_app_name, mountpoint)
+
+            try:
+                is_data = ispy.ISObject(self.ipc_partition, fsinfo_id,
+                        "FileSystemInfo")
+
+                is_data.total_MB = du.total // 1000000
+                is_data.available_MB = du.free // 1000000
+                is_data.occupancy_percent = du.percent
+
+                is_data.checkin()
+                self.logger.debug("fsinfo published: %s", str(is_data))
+            except Exception as ex:
+                self.logger.error("error publishing fsinfo: %s", str(ex))
+
+
+    def update_cpu_stats(self):
+        # get utilisation fraction since last call
+        c = psutil.cpu_times_percent()
+
+        if self.update_cpu_stats_first:
+            # documentation says first value is to be ignored
+            self.update_cpu_stats_first = False
+            return
+
+        try:
+            is_data = ispy.ISObject(self.ipc_partition,
+                    self.cpustats_is_id, "CpuStats")
+
+            is_data.user = c.user
+            is_data.nice = c.nice
+            is_data.system = c.system
+            is_data.idle = c.idle
+            is_data.iowait = c.iowait
+            is_data.hwirq = c.irq
+            is_data.swirq = c.softirq
+
+            is_data.checkin()
+            self.logger.debug("cpustats published: %s", str(is_data))
+        except Exception as ex:
+            self.logger.error("error publishing cpustats: %s", str(ex))
+
+    def update_storage_io_stats(self):
+        now = datetime.now()
+
+        if not self.iostatscache:
+            # cache initialization
+            d = psutil.disk_io_counters(perdisk=True, nowrap=True)
+
+            for sd in self.conf.ISStorageIoStatsDevices:
+                if sd not in d:
+                    self.logger.warning('no stats for device %s', sd)
+                    continue
+                self.iostatscache[sd] = StorageDeviceIoStats(sd)
+                stats = d[sd]
+                self.iostatscache[sd].update(now, stats.read_count,
+                        stats.write_count, stats.read_bytes, stats.write_bytes,
+                        stats.busy_time)
+            return
+        
+        d = psutil.disk_io_counters(perdisk=True, nowrap=True)
+        for sd in d:
+            if not sd in self.conf.ISStorageIoStatsDevices:
+                continue
+            stats = d[sd]
+            (rr, wr, rt, wt, up) = self.iostatscache[sd].update(now,
+                    stats.read_count, stats.write_count, stats.read_bytes,
+                    stats.write_bytes, stats.busy_time)
+
+            obj_id = "{}.{}.iostats-{}".format(self.conf.ISServer,
+                    self.tdaq_app_name, sd)
+
+            try:
+                is_data = ispy.ISObject(self.ipc_partition, obj_id, "StorageIoStats")
+
+                is_data.read_req_rate = rr
+                is_data.write_req_rate = wr
+                is_data.read_tput = rt
+                is_data.write_tput = wt
+                is_data.util = up
+
+                is_data.checkin()
+                self.logger.debug("cpustats published: %s", str(is_data))
+            except Exception as ex:
+                self.logger.error("error publishing cpustats: %s", str(ex))
+
+
+class StorageDeviceIoStats:
+    def __init__(self, name):
+        self.name = name
+        self.rd_rq = None
+        self.wr_rq = None
+        self.rd_by = None
+        self.wr_by = None
+        self.busy_time_ms = None
+        self.ts = None
+
+    def update(self, ts, rd_rq, wr_rq, rd_by, wr_by, busy_time_ms):
+        if self.ts is None:
+            rrr = wrr = rtput = wtput = utilp = None
+        else:
+            dur = ts.timestamp() - self.ts
+            rrr = int((rd_rq - self.rd_rq) / dur)
+            wrr = int((wr_rq - self.wr_rq) / dur)
+            rtput = int((rd_by - self.rd_by) / dur)
+            wtput = int((wr_by - self.wr_by) / dur)
+            utilp = round((busy_time_ms - self.busy_time_ms) / (dur * 10), 2)
+
+
+        self.rd_rq = rd_rq
+        self.wr_rq = wr_rq
+        self.rd_by = rd_by
+        self.wr_by = wr_by
+        self.busy_time_ms = busy_time_ms
+        self.ts = ts.timestamp()
+
+        return (rrr, wrr, rtput, wtput, utilp)
\ No newline at end of file
diff --git a/Script/cs/Tools/ConfigParser.py b/Script/cs/Tools/ConfigParser.py
index dc280de..53371a4 100644
--- a/Script/cs/Tools/ConfigParser.py
+++ b/Script/cs/Tools/ConfigParser.py
@@ -148,6 +148,8 @@ class ConfigHolder(object):
         self.ISPublicationPeriod = 5
         self.ISFileSystemInfoEnabled = False
         self.ISWaitAfterParitionValid = 60
+        self.ISCpuStatsEnabled = False
+        self.ISStorageIoStatsDevices = []
 
         try:
             self.ISEnabled = cfg.ISEnabled
@@ -171,6 +173,12 @@ class ConfigHolder(object):
                 # before trying to publish (otherwise unrecoverable errors)
                 self.ISWaitAfterParitionValid = cfg.ISWaitAfterParitionValid
             except AttributeError: pass
+            try: # Publish global CPU stats
+                self.ISCpuStatsEnabled = cfg.ISCpuStatsEnabled
+            except AttributeError: pass
+            try: # Publish storage devices I/O stats for ISStorageIoStatsDevices
+                self.ISStorageIoStatsDevices = [d for d in cfg.ISStorageIoStatsDevices]
+            except AttributeError: pass
 
 
         # METADATA DATABASE
diff --git a/Script/cs/Tools/LogConfig.py b/Script/cs/Tools/LogConfig.py
index 13d6eae..cc7183e 100644
--- a/Script/cs/Tools/LogConfig.py
+++ b/Script/cs/Tools/LogConfig.py
@@ -6,9 +6,6 @@ import mailinglogger
 def make_is_data_identifier(is_server,tdaq_app_name):
     return "{}.{}.State".format(is_server,tdaq_app_name)
 
-def make_fs_data_identifier(is_server,tdaq_app_name,fs_name):
-    return "{}.{}.fsinfo-{}".format(is_server,tdaq_app_name,fs_name)
-
 def make_tdaq_app_name(config):
     hostname = socket.gethostname()
     cleanFileName = config.FileName.replace("Config", "",1).replace("Conf","",1).replace(".cfg","",1).replace("-","").replace("_","")
diff --git a/UnitTests/ConfigParser_Test.py b/UnitTests/ConfigParser_Test.py
index 38e818b..1a6e496 100644
--- a/UnitTests/ConfigParser_Test.py
+++ b/UnitTests/ConfigParser_Test.py
@@ -49,6 +49,8 @@ class TestConfigHolder(unittest.TestCase):
         self.assert_equal_del('ISPublicationPeriod', 5)
         self.assert_equal_del('ISFileSystemInfoEnabled', False)
         self.assert_equal_del('ISWaitAfterParitionValid', 60)
+        self.assert_equal_del('ISCpuStatsEnabled', False)
+        self.assert_equal_del('ISStorageIoStatsDevices', [])
         self.assert_equal_del('DBURL', None)
         self.assert_equal_del('DBFileTable', '')
         self.assert_equal_del('DBLBTable', '')
@@ -154,6 +156,8 @@ class TestConfigHolder(unittest.TestCase):
         self.assert_equal_del('ISPublicationPeriod', 1)
         self.assert_equal_del('ISFileSystemInfoEnabled', True)
         self.assert_equal_del('ISWaitAfterParitionValid', 1)
+        self.assert_equal_del('ISCpuStatsEnabled', True)
+        self.assert_equal_del('ISStorageIoStatsDevices', ['abc', 'def'])
         self.assert_equal_del('DBURL', 'basededonnees')
         self.assert_equal_del('DBFileTable', 'filetable')
         self.assert_equal_del('DBLBTable', 'lbtable')
diff --git a/UnitTests/InfoServiceThread_Test.py b/UnitTests/InfoServiceThread_Test.py
index 8b322c0..78d8df2 100644
--- a/UnitTests/InfoServiceThread_Test.py
+++ b/UnitTests/InfoServiceThread_Test.py
@@ -63,6 +63,8 @@ class TestInfoServiceThread(unittest.TestCase):
             ISServer="RunParams",
             ISPublicationPeriod="1",
             ISFileSystemInfoEnabled=True,
+            ISCpuStatsEnabled=True,
+            ISStorageIoStatsDevices=['abc'],
             ISWaitAfterParitionValid=60)
 
         ## we're not interested in any logging for this test
-- 
GitLab