Skip to content
Snippets Groups Projects

Resolve "Initial Publish uptime of CS instance to IS"

1 file
+ 2
2
Compare changes
  • Side-by-side
  • Inline
+ 132
0
#!/usr/bin/env tdaq_python
import threading
import psutil
from datetime import datetime
#partition libraries
from ipc import IPCPartition #pylint: disable=no-name-in-module
import ispy
from cs.Tools.utils import thread_id_string
from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name
class InfoServiceThread(threading.Thread):
def __init__(self,conf,event):
# get partition to publish to
self.ipc_partition = IPCPartition(conf.IS_partition)
# IS Server to publish to
# Errors comes up with: IS repository, not server
self.is_server = conf.IS_server
# Publication Period (seconds between updates)
self.is_publication_period = conf.IS_publication_period
# state fields
self.tdaq_app_name = make_tdaq_app_name(conf)
self.uptime_seconds = self.get_process_uptime_in_seconds()
threading.Thread.__init__(self, name="InfoServiceThread")
self.event = event
self.exit_flag = 0
# get logger to files, (Will also log to ERS)
self.logger = enable_file_logging("InfoServiceThread","InfoServiceLog.out", conf.LogDir, conf.LogLevel)
# end def __init__()
def run(self):
self.logger.info(thread_id_string())
while not self.exit_flag:
#update parameters
self.uptime_seconds = self.get_process_uptime_in_seconds()
#send to IS
if(self.ipc_partition.isValid()):
self.logger.debug("Provided Partition %s is Valid", self.ipc_partition.name())
self.send_update()
## For manual testing.
#self.send_wrong_info_update()
#self.cleanup_unwanted_stuff("fake", "Double")
#self.cleanup_unwanted_stuff("CastorScript29", "CastorScriptState")
else:
self.logger.warning("Provided Partition not Valid, IS Service disabled")
#wait for next update
self.event.wait(self.is_publication_period)
# end while
self.logger.info('InfoServiceThread exited')
# end def run()
def info_service_exit(self):
self.exit_flag = 1
self.logger.info('Exit signal received')
def get_process_uptime_in_seconds(self):
time_created = psutil.Process().create_time()
return (datetime.utcnow() - datetime.utcfromtimestamp(time_created)).total_seconds()
def send_update(self):
try:
is_data_type = "CastorScriptState"
is_data_identifier = "{}.{}.State".format(self.is_server,self.tdaq_app_name)
self.logger.debug("Retrieving: %s, type: %s, from IS", is_data_identifier,is_data_type)
is_data = ispy.ISObject(self.ipc_partition, is_data_identifier, is_data_type)
if is_data.exists():
self.logger.debug("found: %s", is_data)
else:
self.logger.debug("%s does not exist", is_data.name())
is_data.uptime_seconds = self.uptime_seconds
is_data.checkin()
#self.logger.debug("Update sent: %s", str(is_data))
except Exception as ex:
self.logger.error("Error occured in send_update(): %s", str(ex))
# ##### TESTING FUNCTIONS #####
# ## These functions are made for messing about in IS for development.
# ## potentially their functionality can be used for migrations between "CastorScriptState" OKS schema definitions
# ## When we change it, we need to make sure that the old version is not on the server, before we can publish the new version
# ## otherwise an error occurs.
# def send_wrong_info_update(self, name_of_unwanted_IS_entry, type_of_unwanted_IS_entry):
# try:
# is_data_type = type_of_unwanted_IS_entry
# is_data_identifier = "{}.{}.State".format(self.is_server,name_of_unwanted_IS_entry)
# self.logger.debug("Retrieving: %s, type: %s, from IS", is_data_identifier,is_data_type)
# is_data = ispy.ISObject(self.ipc_partition, is_data_identifier, is_data_type)
# if is_data.exists():
# self.logger.debug("found: %s", is_data)
# else:
# self.logger.debug("%s does not exist", is_data.name())
# #is_data.uptime_seconds = self.uptime_seconds
# #is_data.value = self.uptime_seconds
# #send data entry
# is_data.checkin()
# except Exception as ex:
# self.logger.error("Error occured in send_wrong_info_update(). Error was: %s", str(ex))
# def cleanup_unwanted_stuff(self, name_of_unwanted_IS_entry, type_of_unwanted_IS_entry):
# '''For deleting old castorscript information in case of change of format'''
# it = ispy.ISInfoIterator(self.ipc_partition, self.is_server, ispy.ISCriteria('.*')) #pylint: disable=no-member
# is_info_dictionary = ispy.ISInfoDictionary(self.ipc_partition) #pylint: disable=no-member
# self.logger.debug("Removing old objects of with old type %s, with name %s in the %s server:",type_of_unwanted_IS_entry,name_of_unwanted_IS_entry, self.is_server)
# while it.next():
# if str(it.name()).find(name_of_unwanted_IS_entry) is not -1 and str(it.type()).find(type_of_unwanted_IS_entry) is not -1:
# is_info_dictionary.remove(it.name())
# #it.name(), it.type().name(), it.time(), it.tag()
\ No newline at end of file
Loading