From b1832bc749d1560d5e8d203cd9e6fee7b6c9ad07 Mon Sep 17 00:00:00 2001 From: Peter Onyisi <ponyisi@utexas.edu> Date: Fri, 30 Oct 2020 16:30:38 +0100 Subject: [PATCH] Update DQ scripts to stomp.py 6.1.0 API --- .../python/messaging_listen.py | 62 +++---------------- DataQuality/DataQualityUtils/python/panic.py | 6 +- .../DataQualityUtils/python/stompconfig.py | 2 +- .../scripts/DQM_Tier0Wrapper_tf.py | 7 +-- .../scripts/DQM_Tier0Wrapper_trf.py | 10 +-- 5 files changed, 17 insertions(+), 70 deletions(-) diff --git a/DataQuality/DataQualityUtils/python/messaging_listen.py b/DataQuality/DataQualityUtils/python/messaging_listen.py index 6b1080ef905..214098897db 100644 --- a/DataQuality/DataQualityUtils/python/messaging_listen.py +++ b/DataQuality/DataQualityUtils/python/messaging_listen.py @@ -18,12 +18,13 @@ class ATLASDQMListener(object): self.selector = selector def __enter__(self): - if stomp.__version__ >= (4,1,11): - return self.__enter41__() + if stomp.__version__ >= (6,1,0): + return self.__enter61__() else: - return self.__enter31__() + logging.critical("Unable to find stomp.py >= 6.1.0, can't proceed") + raise ValueError("Version of stomp.py is too old") - def __enter31__(self): + def __enter61__(self): serverlist=[_[4] for _ in socket.getaddrinfo(MSGSERVER, MSGPORT, socket.AF_INET, socket.SOCK_STREAM)] @@ -33,62 +34,16 @@ class ATLASDQMListener(object): if hasattr(self.listener, 'conn'): self.listener.conn=[] for svr in serverlist: - #print 'set up', svr - cfg = stompconfig.config() + auth = stompconfig.config() + cfg = {} cfg['heartbeats'] = (0,0) cfg['reconnect_attempts_max'] = 3 cfg['version'] = 1.1 conn=stomp.Connection([svr], **cfg) - #print('set up Connection') conn.set_listener('somename',self.listener) if hasattr(self.listener, 'conn'): self.listener.conn.append(conn) - #print('Set up listener') - conn.start() - - #print('started connection') - - conn.connect(wait=True) - #print('connected') - hdr = {} - if self.selector is not None: hdr['selector'] = self.selector - if hasattr(self.listener, 'ack_mode'): - ack_mode=self.listener.ack_mode - else: - ack_mode='auto' - conn.subscribe(destination=self.dest, ack=ack_mode, headers = hdr, id=len(self.conns)) - #print('subscribed') - self.conns.append(conn) - return self - - def __enter41__(self): - serverlist=[_[4] for _ in socket.getaddrinfo(MSGSERVER, MSGPORT, - socket.AF_INET, - socket.SOCK_STREAM)] - - from . import stompconfig - self.conns = [] - if hasattr(self.listener, 'conn'): - self.listener.conn=[] - for svr in serverlist: - #print 'set up', svr - cfg = stompconfig.config() - #cfg['heart-beat'] = (5000,5000) - cfg['reconnect_attempts_max'] = 3 - cfg['version'] = 1.1 - cfg['login'] = cfg['user'] - conn=stomp.Connection([svr], heartbeats=(180000,180000)) - #print('set up Connection') - conn.set_listener('somename',self.listener) - if hasattr(self.listener, 'conn'): - self.listener.conn.append(conn) - #print('Set up listener') - conn.start() - - #print('started connection') - - conn.connect(wait=True, **cfg) - #print('connected') + conn.connect(wait=True, **auth) hdr = {} if self.selector is not None: hdr['selector'] = self.selector if hasattr(self.listener, 'ack_mode'): @@ -96,7 +51,6 @@ class ATLASDQMListener(object): else: ack_mode='auto' conn.subscribe(destination=self.dest, ack=ack_mode, headers = hdr, id=len(self.conns)) - #print('subscribed') self.conns.append(conn) return self diff --git a/DataQuality/DataQualityUtils/python/panic.py b/DataQuality/DataQualityUtils/python/panic.py index b0efb85a371..7df3dfa65ca 100644 --- a/DataQuality/DataQualityUtils/python/panic.py +++ b/DataQuality/DataQualityUtils/python/panic.py @@ -10,11 +10,10 @@ def panic(msg): import traceback import time - conn=stomp.Connection([('atlas-mb.cern.ch', 61013)], **stompconfig.config()) - conn.start() + conn=stomp.Connection([('atlas-mb.cern.ch', 61013)]) print('panic: started connection') - conn.connect(wait=True) + conn.connect(wait=True, **stompconfig.config()) print('panic: connected') header={'MsgClass':'DQ', @@ -32,7 +31,6 @@ def panic(msg): 'time': time.time(), 'usrtime': time.strftime('%Y-%m-%d %H:%H:%M %Z', time.localtime()), }) - #print(msg) conn.send(body, **header) print('panic: sent message') print(header) diff --git a/DataQuality/DataQualityUtils/python/stompconfig.py b/DataQuality/DataQualityUtils/python/stompconfig.py index 71b3f9a9cc3..c8f389fbd99 100644 --- a/DataQuality/DataQualityUtils/python/stompconfig.py +++ b/DataQuality/DataQualityUtils/python/stompconfig.py @@ -3,6 +3,6 @@ def config(): with open('/afs/cern.ch/user/a/atlasdqm/atlas/mqinfo') as f: - return {'user': 'atlasdqm', + return {'username': 'atlasdqm', 'passcode': f.read().strip()} raise RuntimeError('Unable to read STOMP connection info') diff --git a/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_tf.py b/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_tf.py index e2dc385808a..b0e756d36da 100755 --- a/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_tf.py +++ b/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_tf.py @@ -87,9 +87,8 @@ def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod): import stomp, json, ssl from DataQualityUtils import stompconfig dest='/topic/atlas.dqm.progress' - conn=stomp.Connection([('atlas-mb.cern.ch', 61013)], **stompconfig.config()) - conn.start() - conn.connect(wait=True) + conn=stomp.Connection([('atlas-mb.cern.ch', 61013)]) + conn.connect(wait=True, **stompconfig.config()) body = { 'run': run, @@ -107,7 +106,7 @@ def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod): 'persistent': 'true', 'destination': dest, } - conn.send(message=json.dumps(body), destination=dest,headers=headers,ack='auto') + conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto') conn.disconnect() ######################################################################### diff --git a/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_trf.py b/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_trf.py index b10788862ca..fb15b787a84 100755 --- a/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_trf.py +++ b/DataQuality/DataQualityUtils/scripts/DQM_Tier0Wrapper_trf.py @@ -83,12 +83,8 @@ def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod): import stomp, json, ssl from DataQualityUtils import stompconfig dest='/topic/atlas.dqm.progress' - #conn=stomp.Connection([('atlas-mb.cern.ch', 61023)], use_ssl=True, - # ssl_cert_file=os.environ['X509_USER_PROXY'], - # ssl_version=ssl.PROTOCOL_TLSv1) - conn=stomp.Connection([('atlas-mb.cern.ch', 61013)], **stompconfig.config()) - conn.start() - conn.connect(wait=True) + conn=stomp.Connection([('atlas-mb.cern.ch', 61013)]) + conn.connect(wait=True, **stompconfig.config()) body = { 'run': run, @@ -106,7 +102,7 @@ def publish_success_to_mq(run, ptag, stream, incr, ami, procpass, hcfg, isprod): 'persistent': 'true', 'destination': dest, } - conn.send(message=json.dumps(body), destination=dest,headers=headers,ack='auto') + conn.send(body=json.dumps(body), destination=dest,headers=headers,ack='auto') conn.disconnect() ######################################################################### -- GitLab