From 6f740552e3dab33c72f78790db0ba63ab7346f4d Mon Sep 17 00:00:00 2001
From: Vakho Tsulaia <vakhtang.tsulaia@cern.ch>
Date: Tue, 3 Nov 2020 01:54:23 +0100
Subject: [PATCH] Thread safe handling of CORAL transactions by RDBAccessSvc

Given CORAL transaction context seems to be global, one thread can
commit a transaction, which is active at that moment in another thread.
Worked around this issue by starting readonly transaction when a
session is created, and committing that transaction only when the
corresponding session is closed.

Addresses the issue discussed in ATLASRECTS-5737.

Also removed several redundant include statements.
---
 Database/RDBAccessSvc/src/RDBAccessSvc.cxx    | 108 ++++++------------
 Database/RDBAccessSvc/src/RDBAccessSvc.h      |  11 +-
 Database/RDBAccessSvc/src/RDBQuery.cxx        |  20 +---
 Database/RDBAccessSvc/src/RDBQuery.h          |   2 +-
 Database/RDBAccessSvc/src/RDBRecord.cxx       |   1 -
 Database/RDBAccessSvc/src/RDBRecordset.cxx    |   2 -
 .../RDBAccessSvc/src/RDBVersionAccessor.cxx   |   2 -
 .../RDBAccessSvc/src/RDBVersionAccessor.h     |   6 +-
 Database/RDBAccessSvc/src/SourceCompAlg.h     |   1 -
 9 files changed, 42 insertions(+), 111 deletions(-)

diff --git a/Database/RDBAccessSvc/src/RDBAccessSvc.cxx b/Database/RDBAccessSvc/src/RDBAccessSvc.cxx
index edcfb9e4ce7..5dbb1900770 100755
--- a/Database/RDBAccessSvc/src/RDBAccessSvc.cxx
+++ b/Database/RDBAccessSvc/src/RDBAccessSvc.cxx
@@ -17,17 +17,10 @@
 #include "RDBVersionAccessor.h"
 #include "RDBQuery.h"
 
-#include "RelationalAccess/RelationalServiceException.h"
-#include "RelationalAccess/SchemaException.h"
-#include "RelationalAccess/SessionException.h"
-#include "RelationalAccess/IRelationalService.h"
-#include "RelationalAccess/IRelationalDomain.h"
 #include "RelationalAccess/ISessionProxy.h"
 #include "RelationalAccess/ITransaction.h"
 #include "RelationalAccess/SchemaException.h"
 #include "RelationalAccess/ConnectionService.h"
-#include "RelationalAccess/IConnectionService.h"
-#include "RelationalAccess/AccessMode.h"
 #include "RelationalAccess/ICursor.h"
 #include "RelationalAccess/ITable.h"
 #include "RelationalAccess/ISchema.h"
@@ -36,9 +29,6 @@
 #include "CxxUtils/checker_macros.h"
 
 #include "CoralBase/Exception.h"
-#include "RelationalAccess/AuthenticationServiceException.h"
-
-#include "GaudiKernel/ServiceHandle.h"
 
 #include <thread>
 
@@ -53,6 +43,7 @@ RDBAccessSvc::~RDBAccessSvc()
 
 bool RDBAccessSvc::connect(const std::string& connName)
 {
+  std::lock_guard<std::mutex> guard(m_sessionMutex);
   // Check if it is the first attempt to open a connection connName
   if(m_sessions.find(connName)==m_sessions.end()) {
     ATH_MSG_DEBUG(" Trying to open the connection " << connName << " for the first time");
@@ -76,6 +67,7 @@ bool RDBAccessSvc::connect(const std::string& connName)
   coral::ISessionProxy *proxy = 0;
   try {
     proxy = conSvcH.connect(connName,coral::ReadOnly);
+    proxy->transaction().start(true);
     ATH_MSG_DEBUG("Proxy for connection "  << connName << " obtained");
   }
   catch(std::exception& e) {
@@ -91,28 +83,27 @@ bool RDBAccessSvc::connect(const std::string& connName)
 
 bool RDBAccessSvc::disconnect(const std::string& connName)
 {
-  if(m_openConnections.find(connName)==m_openConnections.end()) {
+  auto connection = m_openConnections.find(connName);
+  if(connection==m_openConnections.end()) {
     ATH_MSG_ERROR("Wrong name for the connection: " << connName);
     return false;
   }
 
-  if(m_openConnections[connName]>0) {
-    m_openConnections[connName]--;
+  std::lock_guard<std::mutex> guard(m_sessionMutex); 
+  if(connection->second>0) {
+    connection->second--;
     
-    ATH_MSG_DEBUG("Connection " << connName << " Sessions = " << m_openConnections[connName]);
-
-    if(m_openConnections[connName]==0) {
-      if(m_sessions.find(connName)!=m_sessions.end()) {
-	delete m_sessions[connName];
-	m_sessions[connName] = 0;
+    ATH_MSG_DEBUG("Connection " << connName << " Sessions = " << connection->second);
+
+    if(connection->second==0) {
+      auto session = m_sessions.find(connName);
+      if(session!=m_sessions.end()) {
+	session->second->transaction().commit();
+	delete session->second;
+	session->second = nullptr;
       }
       
       ATH_MSG_DEBUG(connName << " Disconnected!");
-
-      // clean up all shared recordsets for this connection
-      if(m_recordsetptrs.find(connName)!=m_recordsetptrs.end()) {
-	m_recordsetptrs[connName]->clear();
-      }
     }
   }
   return true;
@@ -123,8 +114,10 @@ bool RDBAccessSvc::shutdown(const std::string& connName)
   if(connName=="*Everything*") {
     for(const auto& ii : m_openConnections) {
       if(ii.second != 0) {
-	ATH_MSG_WARNING("Close everything: Connection: " << ii.first << " with reference count = " << ii.second << " will be closed.");
-	return shutdown_connection(ii.first);
+	ATH_MSG_INFO("Close everything: Connection: " << ii.first << " with reference count = " << ii.second << " will be closed.");
+	if(!shutdown_connection(ii.first)) {
+	  return false;
+	}
       }
     }
     return true;
@@ -135,26 +128,24 @@ bool RDBAccessSvc::shutdown(const std::string& connName)
 
 bool RDBAccessSvc::shutdown_connection(const std::string& connName)
 {
-  if(m_openConnections.find(connName)==m_openConnections.end()) {
+  auto connection = m_openConnections.find(connName);
+  if(connection==m_openConnections.end()) {
     ATH_MSG_ERROR("Wrong name for the connection: " << connName);
     return false;
   }
-
-  m_openConnections[connName]=0;
+  std::lock_guard<std::mutex> guard(m_sessionMutex);
+  connection->second = 0;
   
-  ATH_MSG_DEBUG("Connection " << connName << " Sessions = " << m_openConnections[connName]);
-  if(m_sessions.find(connName)!=m_sessions.end()) {
-    delete m_sessions[connName];
-    m_sessions[connName] = 0;
+  auto session = m_sessions.find(connName);
+  if(session!=m_sessions.end() 
+     && session->second) {
+    session->second->transaction().commit();
+    delete session->second;
+    session->second = nullptr;
   }
   
   ATH_MSG_DEBUG(connName << " Disconnected!");
 
-  // clean up all shared recordsets for this connection
-  if(m_recordsetptrs.find(connName)!=m_recordsetptrs.end()) {
-    m_recordsetptrs[connName]->clear();
-  }
-
   return true;
 }
 
@@ -169,7 +160,7 @@ IRDBRecordset_ptr RDBAccessSvc::getRecordsetPtr(const std::string& node,
 
   ATH_MSG_DEBUG("Getting RecordsetPtr with key " << key);
 
-  std::lock_guard<std::mutex> guard(m_mutex);
+  std::lock_guard<std::mutex> guard(m_recordsetMutex);
   if(!connect(connName)) {
     ATH_MSG_ERROR("Unable to open connection " << connName << ". Returning empty recordset");
     return IRDBRecordset_ptr(new RDBRecordset(this));
@@ -188,9 +179,6 @@ IRDBRecordset_ptr RDBAccessSvc::getRecordsetPtr(const std::string& node,
   coral::ISessionProxy* session = m_sessions[connName];
 
   try {
-    // Start new readonly transaction
-    session->transaction().start(true);
-
     // Check lookup table first
     std::string lookupMapKey = tag + "::" + connName;
     GlobalTagLookupMap::const_iterator lookupmap = m_globalTagLookup.find(lookupMapKey);
@@ -209,9 +197,6 @@ IRDBRecordset_ptr RDBAccessSvc::getRecordsetPtr(const std::string& node,
       versionAccessor.getChildTagData();
       recConcrete->getData(session,versionAccessor.getNodeName(),versionAccessor.getTagName(),versionAccessor.getTagID());
     }
-	
-    // Finish the transaction
-    session->transaction().commit();
   }
   catch(coral::SchemaException& se) {
     ATH_MSG_ERROR("Schema Exception : " << se.what());
@@ -234,7 +219,7 @@ std::unique_ptr<IRDBQuery> RDBAccessSvc::getQuery(const std::string& node,
 						  const std::string& connName)
 {
   ATH_MSG_DEBUG("getQuery (" << node << "," << tag << "," << tag2node << "," << connName << ")");
-  std::lock_guard<std::mutex> guard(m_mutex);
+  std::lock_guard<std::mutex> guard(m_recordsetMutex);
 
   std::unique_ptr<IRDBQuery> query;
 
@@ -257,15 +242,9 @@ std::unique_ptr<IRDBQuery> RDBAccessSvc::getQuery(const std::string& node,
       }
     }
     else {
-      // Start new readonly transaction
-      session->transaction().start(true);
-
       RDBVersionAccessor versionAccessor{node,(tag2node.empty()?node:tag2node),tag,session,msg()};
       versionAccessor.getChildTagData();
       childTagId = versionAccessor.getTagID();
-      
-      // Finish the transaction
-      session->transaction().commit();
     }
 
     if(childTagId.empty()) {
@@ -308,7 +287,7 @@ std::string RDBAccessSvc::getChildTag(const std::string& childNode,
 				      bool force)
 {
   ATH_MSG_DEBUG("getChildTag for " << childNode << " " << parentTag << " " << parentNode);
-  std::lock_guard<std::mutex> guard(m_mutex);
+  std::lock_guard<std::mutex> guard(m_recordsetMutex);
 
   // Check lookup table first
   std::string lookupMapKey = parentTag + "::" + connName;
@@ -333,15 +312,9 @@ std::string RDBAccessSvc::getChildTag(const std::string& childNode,
   std::string childTag("");
   try {
     // We don't have lookup table for given parent tag. Go into slow mode through Version Accessor
-    // Start new readonly transaction
     coral::ISessionProxy* session = m_sessions[connName];
-    session->transaction().start(true);
-
     RDBVersionAccessor versionAccessor(childNode,parentNode,parentTag,session,msg());
     versionAccessor.getChildTagData();
-	      
-    // Finish the transaction
-    session->transaction().commit(); 
 
     childTag = versionAccessor.getTagName();
   }
@@ -365,7 +338,7 @@ void RDBAccessSvc::getTagDetails(RDBTagDetails& tagDetails,
                                  const std::string& connName)
 {
   ATH_MSG_DEBUG("getTagDetails for tag: " << tag);
-  std::lock_guard<std::mutex> guard(m_mutex);
+  std::lock_guard<std::mutex> guard(m_recordsetMutex);
 
   if(!connect(connName)) {
     ATH_MSG_ERROR("Failed to open connection " << connName);
@@ -373,9 +346,6 @@ void RDBAccessSvc::getTagDetails(RDBTagDetails& tagDetails,
 
   coral::ISessionProxy* session = m_sessions[connName];
   try {
-    // Start new readonly transaction
-    session->transaction().start(true);
-	
     coral::ITable& tableTag2Node = session->nominalSchema().tableHandle("HVS_TAG2NODE");
     coral::IQuery *queryTag2Node = tableTag2Node.newQuery();
     queryTag2Node->addToOutputList("LOCKED");
@@ -426,10 +396,6 @@ void RDBAccessSvc::getTagDetails(RDBTagDetails& tagDetails,
 	delete lookup;
       }
     }
-    // Finish the transaction
-    if(session->transaction().isActive()) {
-      session->transaction().commit();
-    }
   }
   catch(coral::SchemaException& se) {
     ATH_MSG_INFO("Schema Exception : " << se.what());
@@ -455,9 +421,6 @@ void RDBAccessSvc::getAllLeafNodes(std::vector<std::string>& list,
 
   coral::ISessionProxy* session = m_sessions[connName];
   try {
-    // Start new readonly transaction
-    session->transaction().start(true);
-
     coral::ITable& tableNode = session->nominalSchema().tableHandle("HVS_NODE");
     coral::IQuery *queryNode = tableNode.newQuery();
     queryNode->addToOutputList("NODE_NAME");
@@ -472,11 +435,6 @@ void RDBAccessSvc::getAllLeafNodes(std::vector<std::string>& list,
     }
     
     delete queryNode;
-
-    // Finish the transaction
-    if(session->transaction().isActive()) {
-      session->transaction().commit();
-    }
   }
   catch(coral::SchemaException& se) {
     ATH_MSG_INFO("Schema Exception : " << se.what());
diff --git a/Database/RDBAccessSvc/src/RDBAccessSvc.h b/Database/RDBAccessSvc/src/RDBAccessSvc.h
index 1c326c0beef..f217b1de6f0 100755
--- a/Database/RDBAccessSvc/src/RDBAccessSvc.h
+++ b/Database/RDBAccessSvc/src/RDBAccessSvc.h
@@ -18,7 +18,6 @@
 #include "RDBAccessSvc/IRDBAccessSvc.h"
 #include "RDBRecordset.h"
 
-#include "Gaudi/Property.h"
 #include "AthenaBaseComps/AthService.h"
 
 #include <string>
@@ -30,7 +29,6 @@ class RDBRecordset;
 namespace coral 
 {
   class ISessionProxy;
-  class IRelationalService;
 }
 
 template <class TYPE> class SvcFactory;
@@ -43,7 +41,7 @@ typedef std::map<std::string, IRDBRecordset_ptr> RecordsetPtrMap;
 typedef std::map<std::string, RecordsetPtrMap*> RecordsetPtrsByConn;
 
 // Session map
-typedef std::map<std::string, coral::ISessionProxy*, std::less<std::string> > SessionMap;
+typedef std::map<std::string, coral::ISessionProxy*> SessionMap;
 
 // Lookup table for global tag contents quick access
 typedef std::pair<std::string, std::string> TagNameId;
@@ -59,8 +57,6 @@ typedef std::map<std::string, TagNameIdByNode*> GlobalTagLookupMap; // Key - <Gl
 
 class RDBAccessSvc final : public AthService, virtual public IRDBAccessSvc 
 {
-  friend class RDBRecordset;
-
  public:
 
   /// Retrieve interface ID
@@ -139,12 +135,13 @@ class RDBAccessSvc final : public AthService, virtual public IRDBAccessSvc
 
 private:
   SessionMap m_sessions;
-  std::map<std::string, unsigned int, std::less<std::string> > m_openConnections;
+  std::map<std::string, unsigned int> m_openConnections;
 
   RecordsetPtrsByConn m_recordsetptrs;  
   GlobalTagLookupMap m_globalTagLookup;
 
-  std::mutex m_mutex;
+  std::mutex m_recordsetMutex;
+  std::mutex m_sessionMutex;
 
   bool shutdown_connection(const std::string& connName);
 };
diff --git a/Database/RDBAccessSvc/src/RDBQuery.cxx b/Database/RDBAccessSvc/src/RDBQuery.cxx
index 008e905ceac..0106dbfa2df 100755
--- a/Database/RDBAccessSvc/src/RDBQuery.cxx
+++ b/Database/RDBAccessSvc/src/RDBQuery.cxx
@@ -5,12 +5,10 @@
 #include "RDBQuery.h"
 #include "RDBAccessSvc.h"
 #include <stdexcept>
-#include <iostream>
 
 #include "RelationalAccess/ICursor.h"
 #include "RelationalAccess/IQuery.h"
 #include "RelationalAccess/ISessionProxy.h"
-#include "RelationalAccess/ITransaction.h"
 #include "RelationalAccess/ISchema.h"
 #include "RelationalAccess/ITable.h"
 #include "RelationalAccess/ITableDescription.h"
@@ -61,9 +59,6 @@ void RDBQuery::execute()
 
   try
   {
-    // ... Start readonly transaction
-    m_session->transaction().start(true); 
-
     // ... Get the node name and change to to Upper Case
     std::string upperName = m_nodeName;
     std::string::iterator it = upperName.begin();
@@ -150,21 +145,8 @@ long RDBQuery::size()
 
 void RDBQuery::finalize()
 {
-  if(m_cursor)
-    m_cursor->close();
+  if(m_cursor) m_cursor->close();
 
-  if(m_session) {
-    try {
-      // finish the transaction
-      m_session->transaction().commit();
-    }
-    catch(std::exception& e) {
-      m_accessSvc->msg() << MSG::WARNING << "QUERY Finalize: Exception : " + std::string(e.what()) << endmsg;
-    }
-    catch(...) {
-      m_accessSvc->msg() << MSG::WARNING << "QUERY Finalize : Exception caught... "  << endmsg;
-    }
-  }
   m_accessSvc->disconnect(m_connName);
 }
 
diff --git a/Database/RDBAccessSvc/src/RDBQuery.h b/Database/RDBAccessSvc/src/RDBQuery.h
index b9c896368af..0520a972d57 100755
--- a/Database/RDBAccessSvc/src/RDBQuery.h
+++ b/Database/RDBAccessSvc/src/RDBQuery.h
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 #ifndef RDBACCESSSVC_RDBQUERY_H
diff --git a/Database/RDBAccessSvc/src/RDBRecord.cxx b/Database/RDBAccessSvc/src/RDBRecord.cxx
index 0da53148915..af4b8982c43 100755
--- a/Database/RDBAccessSvc/src/RDBRecord.cxx
+++ b/Database/RDBAccessSvc/src/RDBRecord.cxx
@@ -22,7 +22,6 @@
 
 #include <stdexcept>
 #include <sstream>
-#include <iostream>
 
 RDBRecord::RDBRecord(const coral::AttributeList& attList, 
 		     std::string tableName):
diff --git a/Database/RDBAccessSvc/src/RDBRecordset.cxx b/Database/RDBAccessSvc/src/RDBRecordset.cxx
index e0ab595b145..d132e795dfc 100755
--- a/Database/RDBAccessSvc/src/RDBRecordset.cxx
+++ b/Database/RDBAccessSvc/src/RDBRecordset.cxx
@@ -15,8 +15,6 @@
 #include "RDBAccessSvc.h"
 #include "RDBRecordset.h"
 #include "RDBRecord.h"
-#include "RDBVersionAccessor.h"
-#include "RDBQuery.h"
 
 #include "RelationalAccess/ISessionProxy.h"
 #include "RelationalAccess/ICursor.h"
diff --git a/Database/RDBAccessSvc/src/RDBVersionAccessor.cxx b/Database/RDBAccessSvc/src/RDBVersionAccessor.cxx
index c7e972e9e54..0a163f00d44 100755
--- a/Database/RDBAccessSvc/src/RDBVersionAccessor.cxx
+++ b/Database/RDBAccessSvc/src/RDBVersionAccessor.cxx
@@ -18,7 +18,6 @@
 #include "RelationalAccess/ICursor.h"
 #include "RelationalAccess/ITable.h"
 #include "RelationalAccess/ISchema.h"
-#include "RelationalAccess/ITransaction.h"
 #include "RelationalAccess/IQuery.h"
 #include "RelationalAccess/SchemaException.h"
 
@@ -28,7 +27,6 @@
 #include "CoralBase/AttributeList.h"
 
 #include "GaudiKernel/MsgStream.h"
-#include "GaudiKernel/IMessageSvc.h"
 
 #include <stdexcept>
 #include <sstream>
diff --git a/Database/RDBAccessSvc/src/RDBVersionAccessor.h b/Database/RDBAccessSvc/src/RDBVersionAccessor.h
index 19681e013e4..de499745093 100755
--- a/Database/RDBAccessSvc/src/RDBVersionAccessor.h
+++ b/Database/RDBAccessSvc/src/RDBVersionAccessor.h
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 /**
@@ -12,8 +12,8 @@
  * $Id: RDBVersionAccessor.h,v 1.11 2006-05-11 22:34:39 tsulaia Exp $
  */
 
-#ifndef _RDB_VERSIONACCESSOR_H_
-#define _RDB_VERSIONACCESSOR_H_
+#ifndef RDBACCESSSVC_RDBVERSIONACCESSOR_H
+#define RDBACCESSSVC_RDBVERSIONACCESSOR_H
 
 #include <string>
 #include "GaudiKernel/MsgStream.h"
diff --git a/Database/RDBAccessSvc/src/SourceCompAlg.h b/Database/RDBAccessSvc/src/SourceCompAlg.h
index 62df92bbd0f..2382314be51 100644
--- a/Database/RDBAccessSvc/src/SourceCompAlg.h
+++ b/Database/RDBAccessSvc/src/SourceCompAlg.h
@@ -6,7 +6,6 @@
 #define RDBACCESSSVC_SOURCECOMPALG_H
 
 #include "AthenaBaseComps/AthAlgorithm.h"
-#include "RDBAccessSvc/IRDBAccessSvc.h"
 
 #include <string>
 #include <fstream>
-- 
GitLab