From c4a0692ced8a91cf76792215f484ce333ae94108 Mon Sep 17 00:00:00 2001
From: Fabrice Le Goff <fabrice.le.goff@cern.ch>
Date: Tue, 12 Sep 2023 10:57:11 +0200
Subject: [PATCH] use SFO_TZ_RUN:tzstate to mark streams transferring/ed

---
 Script/cs/Tools/Libraries/Database.py | 118 +++++++++++++++-----------
 1 file changed, 70 insertions(+), 48 deletions(-)

diff --git a/Script/cs/Tools/Libraries/Database.py b/Script/cs/Tools/Libraries/Database.py
index d04f591..2a3d2b8 100644
--- a/Script/cs/Tools/Libraries/Database.py
+++ b/Script/cs/Tools/Libraries/Database.py
@@ -53,7 +53,7 @@ class OraDB(object):
 
 class Database():
 
-    def __init__(self,conf,logger,parser):
+    def __init__(self, conf, logger, parser):
         self.conf = conf
         self.parser = parser
 
@@ -85,15 +85,14 @@ class Database():
             else:
                 return (True, False)
         except (cx_Oracle.InterfaceError,cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
-            self.logger.error(str(ex))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                str(ex), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return (False, False)
         except:
-            ex_info = sys.exc_info()
-            self.logger.error(str(ex_info))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return (False, False)
 
 
@@ -114,14 +113,13 @@ class Database():
                 return (None, None, None)
             result = row[:3]
         except (cx_Oracle.InterfaceError,cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
-            self.logger.error(str(ex))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                str(ex), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
         except:
-            ex_info = sys.exc_info()
-            self.logger.error(str(ex_info))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
 
         return result
 
@@ -130,15 +128,14 @@ class Database():
             self.filedeletion(sfofile)
             return True
         except (cx_Oracle.InterfaceError,cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
-            self.logger.error(str(ex))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                str(ex), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return False
         except:
-            ex_info = sys.exc_info()
-            self.logger.error(str(ex_info))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return True
 
     def Transfer(self, sfofile, remotefile):
@@ -150,9 +147,9 @@ class Database():
                 self.filetransfer(sfofile, remotefile)
             except (cx_Oracle.InterfaceError, cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
                 ex_info = str(ex)
-                self.logger.error(ex_info)
-                self.logger.error(str(self.db.lastOp()))
-                self.logger.error(str(self.db.getLastKeys()))
+                errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                    str(ex), str(self.db.lastOp()), str(self.db.getLastKeys()))
+                self.logger.error(errstr)
 
                 if 'ORA-20199' in ex_info and 'NEXTVAL' in ex_info:
                     retry = True
@@ -160,16 +157,18 @@ class Database():
                     return False
 
             except:
-                ex_info = sys.exc_info()
-                self.logger.error(str(ex_info))
-                self.logger.error(str(self.db.lastOp()))
-                self.logger.error(str(self.db.getLastKeys()))
+                errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                    str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+                self.logger.error(errstr)
                 return True
 
 
         try:
             parsed=self.parser(sfofile)
 
+            # mark the corresponding SFO_TZ_RUN entry as TRANSFERRING for Tier 0
+            self.markStreamTransferring(parsed)
+
             if self.notTransFiles(parsed)==0:
                 self.lbtransfer(parsed)
 
@@ -177,19 +176,20 @@ class Database():
                 self.runtransfer(parsed)
 
             return True
+
         except (cx_Oracle.InterfaceError, cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
-            self.logger.error(str(ex))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                str(ex), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return False
 
         except:
-            ex_info = sys.exc_info()
-            self.logger.error(str(ex_info))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
             return True
 
+
     def UpdateTruncated(self, sfofile, size, checksum):
 
         sql="update " + self.file_table + " "
@@ -206,14 +206,14 @@ class Database():
             self.db.execute(args,keys)
             return True
         except (cx_Oracle.InterfaceError,cx_Oracle.DatabaseError) as ex: #pylint: disable=no-member
-            self.logger.error(str(ex))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'database error: {}, request: "{}", request keys:"{}"'.format(
+                str(ex), str(self.db.lastOp()), str(self.db.getLastKeys())
+            )
+            self.logger.error(errstr)
         except:
-            ex_info = sys.exc_info()
-            self.logger.error(str(ex_info))
-            self.logger.error(str(self.db.lastOp()))
-            self.logger.error(str(self.db.getLastKeys()))
+            errstr = 'error: {}, request: "{}", request keys:"{}"'.format(
+                str(sys.exc_info()), str(self.db.lastOp()), str(self.db.getLastKeys()))
+            self.logger.error(errstr)
 
         return False
 
@@ -303,7 +303,6 @@ class Database():
                                   ', stream name ' + str(parsed.StreamName()))
 
     def runtransfer(self,parsed):
-
         keys={}
         keys['ssfo']=parsed.AppId()
         keys['srunnr']=parsed.RunNr()
@@ -320,17 +319,17 @@ class Database():
 
         if state == 'CLOSED':
 
-            sql="update "+ self.run_table + " "
-            sql+="set state='TRANSFERRED', t_stamp = :newdate "
-            sql+="where sfoid = :ssfo and runnr = :srunnr "
+            sql="update "+ self.run_table
+            sql+=" set state='TRANSFERRED', tzstate='TRANSFERRED', t_stamp = :newdate"
+            sql+=" where sfoid = :ssfo and runnr = :srunnr "
             sql+=" and streamtype = :sstreamt and stream = :sstreamn "
             sql+=" and state = 'CLOSED'"
 
-
             keys['newdate']=datetime.datetime.utcfromtimestamp(time.time())
 
             args = [sql,]
             nrow = self.db.execute(args,keys)
+
             if nrow != 1:
                 self.logger.error('Not exactly one entry updated'
                                   ' in run table '+
@@ -347,6 +346,29 @@ class Database():
                                   ', stream name ' + str(parsed.StreamName()))
 
 
+    def markStreamTransferring(self, parsed):
+        keys={}
+        keys['ssfo']=parsed.AppId()
+        keys['srunnr']=parsed.RunNr()
+        keys['sstreamt']=parsed.StreamType()
+        keys['sstreamn']=parsed.StreamName()
+
+        sql="update "+ self.run_table
+        sql+=" set tzstate='TRANSFERRING', t_stamp = :newdate"
+        sql+=" where sfoid = :ssfo and runnr = :srunnr"
+        sql+=" and streamtype = :sstreamt and stream = :sstreamn"
+        sql+=" and tzstate != 'TRANSFERRING'" # avoid unnecessary update and overwriting of t_stamp
+
+        keys['newdate'] = datetime.datetime.utcfromtimestamp(time.time())
+
+        args = [sql,]
+        nrow = self.db.execute(args,keys)
+        self.logger.debug(
+            'tzstate update for SFO %s, run %s, stream type %s, stream name %s: %s row updated'
+            , str(parsed.AppId()), str(parsed.RunNr()), str(parsed.StreamType())
+            , str(parsed.StreamName()), str(nrow))
+
+
     def notTransFiles(self,parsed):
         sql="select count(lfn) from " + self.file_table + " "
         sql+="where sfoid = :ssfo and runnr = :srunnr and lumiblocknr = :slbnr "
-- 
GitLab