Commit ace0562c authored by Walter Lampl's avatar Walter Lampl
Browse files

Merge branch 'CondProxyProvider' into 'master'

Thread-safe CondProxyProvider and EventSelectorAthenaPool.

See merge request !32683
parents 793c99d6 68dcc7d3
......@@ -31,8 +31,8 @@
CondProxyProvider::CondProxyProvider(const std::string& name, ISvcLocator* pSvcLocator) :
::AthService(name, pSvcLocator),
m_athenaPoolCnvSvc("AthenaPoolCnvSvc", name),
m_poolCollectionConverter(0),
m_headerIterator(0) {
m_poolCollectionConverter(0)
{
}
//________________________________________________________________________________
CondProxyProvider::~CondProxyProvider() {
......@@ -70,7 +70,6 @@ StatusCode CondProxyProvider::initialize() {
}
//________________________________________________________________________________
StatusCode CondProxyProvider::finalize() {
m_headerIterator = 0;
if (m_poolCollectionConverter != 0) {
m_poolCollectionConverter->disconnectDb().ignore();
delete m_poolCollectionConverter; m_poolCollectionConverter = 0;
......@@ -103,9 +102,9 @@ StatusCode CondProxyProvider::preLoadAddresses(StoreID::type storeID,
}
// Create DataHeader iterators
m_headerIterator = &m_poolCollectionConverter->executeQuery();
pool::ICollectionCursor* headerIterator = &m_poolCollectionConverter->executeQuery();
for (int verNumber = 0; verNumber < 100; verNumber++) {
if (!m_headerIterator->next()) {
if (!headerIterator->next()) {
m_poolCollectionConverter->disconnectDb().ignore();
delete m_poolCollectionConverter; m_poolCollectionConverter = 0;
m_inputCollectionsIterator++;
......@@ -116,8 +115,8 @@ StatusCode CondProxyProvider::preLoadAddresses(StoreID::type storeID,
return(StatusCode::FAILURE);
}
// Get DataHeader iterator
m_headerIterator = &m_poolCollectionConverter->executeQuery();
if (!m_headerIterator->next()) {
headerIterator = &m_poolCollectionConverter->executeQuery();
if (!headerIterator->next()) {
return(StatusCode::FAILURE);
}
} else {
......@@ -126,7 +125,7 @@ StatusCode CondProxyProvider::preLoadAddresses(StoreID::type storeID,
}
SG::VersionedKey myVersKey(name(), verNumber);
Token* token = new Token;
token->fromString(m_headerIterator->eventRef().toString());
token->fromString(headerIterator->eventRef().toString());
TokenAddress* tokenAddr = new TokenAddress(POOL_StorageType, ClassID_traits<DataHeader>::ID(), "", myVersKey, IPoolSvc::kInputStream, token);
if (!detectorStoreSvc->recordAddress(tokenAddr).isSuccess()) {
ATH_MSG_ERROR("Cannot record DataHeader.");
......
......@@ -27,7 +27,7 @@ class IAthenaPoolCnvSvc;
/** @class CondProxyProvider
* @brief This class is the AddressProvider for conditions data.
**/
class CondProxyProvider : public ::AthService, virtual public IAddressProvider {
class ATLAS_CHECK_THREAD_SAFETY CondProxyProvider : public ::AthService, virtual public IAddressProvider {
public: // Constructor and Destructor
/// Standard Service Constructor
CondProxyProvider(const std::string& name, ISvcLocator* pSvcLocator);
......@@ -58,18 +58,18 @@ public: // Constructor and Destructor
private: // data
ServiceHandle<IAthenaPoolCnvSvc> m_athenaPoolCnvSvc;
mutable PoolCollectionConverter* m_poolCollectionConverter;
mutable pool::ICollectionCursor* m_headerIterator;
mutable PoolCollectionConverter* m_poolCollectionConverter ATLAS_THREAD_SAFE;
private: // properties
/// InputCollections, vector with names of the input collections.
StringArrayProperty m_inputCollectionsProp
{ this, "InputCollections", {}, "Files to read", "OrderedSet<std::string>" };
mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator;
mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator ATLAS_THREAD_SAFE;
private: // internal helper functions
/// Return pointer to new PoolCollectionConverter
PoolCollectionConverter* getCollectionCnv();
};
#endif
......@@ -341,8 +341,6 @@ StatusCode EventSelectorAthenaPool::start() {
} else {
m_headerIterator = &m_poolCollectionConverter->executeQuery(/*m_query.value()*/);
}
delete m_beginIter; m_beginIter = nullptr;
m_beginIter = new EventContextAthenaPool(this);
delete m_endIter; m_endIter = nullptr;
m_endIter = new EventContextAthenaPool(nullptr);
return(StatusCode::SUCCESS);
......@@ -388,7 +386,6 @@ StatusCode EventSelectorAthenaPool::finalize() {
}
}
}
delete m_beginIter; m_beginIter = nullptr;
delete m_endIter; m_endIter = nullptr;
m_headerIterator = nullptr;
if (m_poolCollectionConverter != nullptr) {
......@@ -753,9 +750,8 @@ StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
}
// Create DataHeader iterators
m_headerIterator = &m_poolCollectionConverter->executeQuery();
delete m_beginIter; m_beginIter = nullptr;
m_beginIter = new EventContextAthenaPool(this);
next(*m_beginIter).ignore();
EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
next(*beginIter).ignore();
ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
} catch (std::exception &e) {
m_headerIterator = nullptr;
......
......@@ -26,6 +26,7 @@
#include "AthenaBaseComps/AthService.h"
#include <map>
#include <atomic>
// Forward declarations
class IIncidentSvc;
......@@ -165,15 +166,14 @@ private: // internal member functions
void fireEndFileIncidents(bool isLastFile) const;
private: // data
mutable EventContextAthenaPool* m_beginIter{};
EventContextAthenaPool* m_endIter{};
ServiceHandle<ActiveStoreSvc> m_activeStoreSvc{this, "ActiveStoreSvc", "ActiveStoreSvc", ""};
mutable PoolCollectionConverter* m_poolCollectionConverter{};
mutable pool::ICollectionCursor* m_headerIterator{};
mutable Guid m_guid{};
mutable std::map<SG::SourceID, int> m_activeEventsPerSource;
mutable PoolCollectionConverter* m_poolCollectionConverter ATLAS_THREAD_SAFE {};
mutable pool::ICollectionCursor* m_headerIterator ATLAS_THREAD_SAFE {};
mutable Guid m_guid ATLAS_THREAD_SAFE {};
mutable std::map<SG::SourceID, int> m_activeEventsPerSource ATLAS_THREAD_SAFE;
ServiceHandle<IAthenaPoolCnvSvc> m_athenaPoolCnvSvc{this, "AthenaPoolCnvSvc", "AthenaPoolCnvSvc", ""};
ServiceHandle<IIncidentSvc> m_incidentSvc{this, "IncidentSvc", "IncidentSvc", ""};
......@@ -196,7 +196,7 @@ private: // properties
Gaudi::Property<std::string> m_attrListKey{this, "AttributeListKey", "Input", ""};
/// InputCollections, vector with names of the input collections.
Gaudi::Property<std::vector<std::string>> m_inputCollectionsProp{this, "InputCollections", {}, ""};
mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator;
mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator ATLAS_THREAD_SAFE;
void inputCollectionsHandler(Property&);
/// Query, query string.
Gaudi::Property<std::string> m_query{this, "Query", "", ""};
......@@ -226,17 +226,17 @@ private: // properties
Gaudi::CheckedProperty<int> m_initTimeStamp{this, "InitialTimeStamp", 0, ""};
Gaudi::Property<int> m_timeStampInterval{this, "TimeStampInterval", 0, ""};
mutable long m_curCollection{};
mutable std::vector<int> m_numEvt;
mutable std::vector<int> m_firstEvt;
mutable std::atomic_long m_curCollection{};
mutable std::vector<int> m_numEvt ATLAS_THREAD_SAFE;
mutable std::vector<int> m_firstEvt ATLAS_THREAD_SAFE;
/// SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property<int> m_skipEvents{this, "SkipEvents", 0, ""};
Gaudi::Property<std::vector<long>> m_skipEventSequenceProp{this, "SkipEventSequence", {}, ""};
mutable std::vector<long> m_skipEventSequence;
mutable std::vector<long> m_skipEventSequence ATLAS_THREAD_SAFE;
mutable int m_evtCount{}; // internal count of events
mutable bool m_firedIncident{};
mutable std::atomic_int m_evtCount{}; // internal count of events
mutable std::atomic_bool m_firedIncident{};
typedef std::mutex CallMutex;
mutable CallMutex m_callLock;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment