Skip to content
Snippets Groups Projects
Commit 25a71eab authored by Edward Moyse's avatar Edward Moyse
Browse files

Merge branch 'morememoryfences' into 'master'

IDC: Memory fence info

See merge request !38867
parents 3f5f6646 5620c618
No related branches found
No related tags found
6 merge requests!58791DataQualityConfigurations: Modify L1Calo config for web display,!46784MuonCondInterface: Enable thread-safety checking.,!46776Updated LArMonitoring config file for WD to match new files produced using MT,!45405updated ART test cron job,!42417Draft: DIRE and VINCIA Base Fragments for Pythia 8.3,!38867IDC: Memory fence info
......@@ -16,10 +16,7 @@
#include <mutex>
#include <atomic>
#include "EventContainers/IDC_WriteHandleBase.h"
//abarton
//Enabling the ability to remove collections to help compatability with old code.
//This may be removed to improved thread-safety.
#define IdentifiableCacheBaseRemove
namespace EventContainers {
......@@ -35,7 +32,6 @@ static constexpr uintptr_t ABORTEDflag = UINTPTR_MAX-1;
static constexpr size_t s_defaultBucketSize =2;
typedef std::true_type thread_safe;
typedef std::set<IdentifierHash> idset_t;
#include "EventContainers/deleter.h"
......@@ -85,11 +81,6 @@ typedef std::set<IdentifierHash> idset_t;
///Create a set of hashes, updates an IDC mask as appropriate
void createSet (const std::vector<IdentifierHash>& hashes, std::vector<bool> &mask);
#ifdef IdentifiableCacheBaseRemove
///Use to remove a collection from the cache, this is for backwards compatability
/// and should not be used in a concurrent environment
bool remove (IdentifierHash hash);
#endif
size_t fullSize() const { return m_vec.size(); }
///In a concurrent situation this number isn't necessarily perfectly synchronised with ids().size()
size_t numberOfHashes();
......
......@@ -190,11 +190,9 @@ public:
virtual StatusCode naughtyRetrieve ATLAS_NOT_THREAD_SAFE (IdentifierHash hashId, T* &collToRetrieve) const override final;
#ifdef IdentifiableCacheBaseRemove
/// remove collection from container for id hash, returning it
/// (and ownership) to client
T* removeCollection(IdentifierHash hashId);
#endif
/// reset m_hashids and call IdentifiableCache's cleanup
virtual void cleanup() override final;
......@@ -251,14 +249,12 @@ public:
}
};
#ifdef IdentifiableCacheBaseRemove
template < class T>
T* //Please don't do this we want to get rid of this
IdentifiableContainerMT<T>::removeCollection( IdentifierHash hashId )
{
return reinterpret_cast<T*>(m_link->removeCollection(hashId));
}
#endif
......
......@@ -24,7 +24,7 @@ void IDC_WriteHandleBase::ReleaseLock(){
//Running code
assert(m_atomic->load() != ABORTstate);
lockguard lk(m_mut->mutex);
m_atomic->compare_exchange_strong(waitstate, ABORTstate);
m_atomic->compare_exchange_strong(waitstate, ABORTstate, std::memory_order_relaxed, std::memory_order_relaxed);
m_mut->condition.notify_all();
m_atomic = nullptr;
}
......
......@@ -39,16 +39,13 @@ IdentifiableCacheBase::IdentifiableCacheBase (IdentifierHash maxHash,
}
IdentifiableCacheBase::~IdentifiableCacheBase()
{
}
IdentifiableCacheBase::~IdentifiableCacheBase()=default;
int IdentifiableCacheBase::tryLock(IdentifierHash hash, IDC_WriteHandleBase &lock, std::vector<IdentifierHash> &wait){
assert(m_NMutexes > 0);
const void *ptr1 =nullptr;
if(m_vec[hash].compare_exchange_strong(ptr1, INVALID)){//atomic swap (replaces ptr1 with value)
if(m_vec[hash].compare_exchange_strong(ptr1, INVALID, std::memory_order_relaxed, std::memory_order_relaxed)){//atomic swap (replaces ptr1 with value)
//First call
size_t slot = hash % m_NMutexes;
auto &mutexpair = m_HoldingMutexes[slot];
......@@ -71,7 +68,8 @@ void IdentifiableCacheBase::clear (deleter_f* deleter)
size_t s = m_vec.size();
if(0 != m_currentHashes.load(std::memory_order_relaxed)){
for (size_t i=0; i<s ;i++) {
const void* ptr = m_vec[i].exchange(nullptr);
const void* ptr = m_vec[i].load(std::memory_order_relaxed);
m_vec[i].store(nullptr, std::memory_order_relaxed);
if (ptr && ptr < ABORTED){
deleter (ptr);
}
......@@ -86,23 +84,24 @@ void IdentifiableCacheBase::clear (deleter_f* deleter)
//Does not lock or clear atomics to allow faster destruction
void IdentifiableCacheBase::cleanUp (deleter_f* deleter)
{
std::atomic_thread_fence(std::memory_order_acquire);
if(0 != m_currentHashes.load(std::memory_order_relaxed)){ //Reduce overhead if cache was unused
size_t s = m_vec.size();
for (size_t i=0; i<s ;i++) {
const void* p = m_vec[i];
const void* p = m_vec[i].load(std::memory_order_relaxed);
if(p && p < ABORTED) deleter (p);
}
}
}
int IdentifiableCacheBase::itemAborted (IdentifierHash hash){
const void* p = m_vec[hash].load();
const void* p = m_vec[hash].load(std::memory_order_relaxed); //Relaxed because it is not returning a pointer to anything
return (p == ABORTED);
}
int IdentifiableCacheBase::itemInProgress (IdentifierHash hash){
const void* p = m_vec[hash].load();
const void* p = m_vec[hash].load(std::memory_order_relaxed); //Relaxed because it is not returning a pointer to anything
return (p == INVALID);
}
......@@ -110,24 +109,27 @@ int IdentifiableCacheBase::itemInProgress (IdentifierHash hash){
const void* IdentifiableCacheBase::find (IdentifierHash hash) noexcept
{
if (ATH_UNLIKELY(hash >= m_vec.size())) return nullptr;
const void* p = m_vec[hash].load();
const void* p = m_vec[hash].load(std::memory_order_relaxed);
if (p >= ABORTED)
return nullptr;
//Now we know it is a real pointer we can ensure the data is synced
std::atomic_thread_fence(std::memory_order_acquire);
return p;
}
const void* IdentifiableCacheBase::waitFor(IdentifierHash hash)
{
const void* item = m_vec[hash].load();
const void* item = m_vec[hash].load(std::memory_order_acquire);
if(m_NMutexes ==0) return item;
size_t slot = hash % m_NMutexes;
if(item == INVALID){
mutexPair &mutpair = m_HoldingMutexes[slot];
uniqueLock lk(mutpair.mutex);
while( (item =m_vec[hash].load()) == INVALID){
while( (item =m_vec[hash].load(std::memory_order_relaxed)) == INVALID){
mutpair.condition.wait(lk);
}
}
std::atomic_thread_fence(std::memory_order_acquire);
return item;
}
......@@ -195,30 +197,19 @@ void IdentifiableCacheBase::createSet (const std::vector<IdentifierHash>& hashes
}
}
#ifdef IdentifiableCacheBaseRemove
bool IdentifiableCacheBase::remove (IdentifierHash hash)
{
if (hash >= m_vec.size()) return false;
if(m_vec[hash]){
m_vec[hash] = nullptr;
m_currentHashes--;
return true;
}
return false;
}
#endif
size_t IdentifiableCacheBase::numberOfHashes()
{
return m_currentHashes.load();
return m_currentHashes.load(std::memory_order_relaxed); //Not to be used for syncing
}
std::vector<IdentifierHash> IdentifiableCacheBase::ids()
{
std::vector<IdentifierHash> ret;
ret.reserve (m_currentHashes);
ret.reserve (m_currentHashes.load(std::memory_order_relaxed));
size_t s = m_vec.size();
for (size_t i =0; i<s; i++) {
const void* p = m_vec[i].load();
const void* p = m_vec[i].load(std::memory_order_relaxed);
if (p && p < ABORTED)
ret.push_back (i);
}
......@@ -231,13 +222,13 @@ std::pair<bool, const void*> IdentifiableCacheBase::add (IdentifierHash hash, co
if (ATH_UNLIKELY(hash >= m_vec.size())) return std::make_pair(false, nullptr);
if(p==nullptr) return std::make_pair(false, nullptr);
const void* nul=nullptr;
if(m_vec[hash].compare_exchange_strong(nul, p)){
m_currentHashes++;
if(m_vec[hash].compare_exchange_strong(nul, p, std::memory_order_release, std::memory_order_relaxed)){
m_currentHashes.fetch_add(1, std::memory_order_relaxed);
return std::make_pair(true, p);
}
const void* invalid = INVALID;
if(m_vec[hash].compare_exchange_strong(invalid, p)){
m_currentHashes++;
if(m_vec[hash].compare_exchange_strong(invalid, p, std::memory_order_release, std::memory_order_acquire)){
m_currentHashes.fetch_add(1, std::memory_order_relaxed);
return std::make_pair(true, p);
}
return std::make_pair(false, invalid);
......@@ -250,13 +241,13 @@ std::pair<bool, const void*> IdentifiableCacheBase::addLock (IdentifierHash hash
assert(hash < m_vec.size());
if(p==nullptr) return std::make_pair(false, nullptr);
const void* invalid = INVALID;
if(m_vec[hash].compare_exchange_strong(invalid, p)){
m_currentHashes++;
if(m_vec[hash].compare_exchange_strong(invalid, p, std::memory_order_release, std::memory_order_relaxed)){
m_currentHashes.fetch_add(1, std::memory_order_relaxed);
return std::make_pair(true, p);
}
const void* nul=nullptr;
if(m_vec[hash].compare_exchange_strong(nul, p)){
m_currentHashes++;
if(m_vec[hash].compare_exchange_strong(nul, p, std::memory_order_release, std::memory_order_acquire)){
m_currentHashes.fetch_add(1, std::memory_order_relaxed);
return std::make_pair(true, p);
}
return std::make_pair(false, nul);
......
......@@ -23,17 +23,17 @@ bool InternalOfflineFast::tryAddFromCache(IdentifierHash hash)
void InternalOfflineFast::wait() const {
std::scoped_lock lock (m_waitMutex);
if(m_needsupdate == false) return;
if(m_needsupdate.load(std::memory_order_acquire) == false) return;
m_map.clear();
for(size_t i=0 ;i < m_fullMap.size(); ++i){
if(m_fullMap[i]) m_map.emplace_back(i, m_fullMap[i]);
}
m_map.shrink_to_fit();
m_needsupdate.store(false);
m_needsupdate.store(false, std::memory_order_release);
}
std::vector<IdentifierHash> InternalOfflineFast::getAllCurrentHashes() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
std::vector<IdentifierHash> ids;
ids.reserve(m_map.size());
for(auto &x : m_map) {
......@@ -44,35 +44,35 @@ std::vector<IdentifierHash> InternalOfflineFast::getAllCurrentHashes() const {
InternalConstItr
InternalOfflineFast::cend() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map.cend();
}
const std::vector < I_InternalIDC::hashPair >& InternalOfflineFast::getAllHashPtrPair() const{
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map;
}
InternalConstItr
InternalOfflineFast::cbegin() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map.cbegin();
}
InternalConstItr InternalOfflineFast::indexFind( IdentifierHash hashId ) const{
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
auto itr = std::lower_bound( m_map.cbegin(), m_map.cend(), hashId.value(), [](const hashPair &lhs, IdentifierHash::value_type rhs) -> bool { return lhs.first < rhs; } );
if(itr!= m_map.cend() && itr->first==hashId) return itr;
return m_map.cend();
}
size_t InternalOfflineFast::numberOfCollections() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map.size();
}
void InternalOfflineFast::cleanUp(deleter_f* deleter) noexcept {
if(!m_needsupdate) {
if(!m_needsupdate.load(std::memory_order_acquire)) {
for(const auto& x : m_map) { deleter(x.second); m_fullMap[x.first] = nullptr; }
if(!m_map.empty()) m_needsupdate.store(true, std::memory_order_relaxed);
}
......@@ -120,7 +120,7 @@ StatusCode InternalOfflineFast::fetchOrCreate(const std::vector<IdentifierHash>&
}
void InternalOfflineFast::destructor(deleter_f* deleter) noexcept {
if(!m_needsupdate) for(const auto& x : m_map) deleter(x.second);
if(!m_needsupdate.load(std::memory_order_acquire)) for(const auto& x : m_map) deleter(x.second);
else {
for(size_t i=0 ;i < m_fullMap.size(); ++i){
if(m_fullMap[i]) deleter(m_fullMap[i]);
......
......@@ -23,20 +23,20 @@ bool InternalOfflineMap::tryAddFromCache(IdentifierHash hash)
void InternalOfflineMap::wait() const {
std::scoped_lock lock (m_waitMutex);
if(m_needsupdate == false) return;
if(m_needsupdate.load(std::memory_order_acquire) == false) return;
m_map.clear();
m_map.reserve(m_fullMap.size());
for(const auto &pair : m_fullMap){
m_map.emplace_back(pair.first, pair.second);
}
std::sort(m_map.begin(), m_map.end());
m_needsupdate.store(false);
m_needsupdate.store(false, std::memory_order_release);
}
std::vector<IdentifierHash> InternalOfflineMap::getAllCurrentHashes() const {
std::vector<IdentifierHash> ids;
ids.reserve(m_fullMap.size());
if(m_needsupdate == true){
if(m_needsupdate.load(std::memory_order_acquire) == true){
for(const auto &pair : m_fullMap){
ids.emplace_back(pair.first);
}
......@@ -51,23 +51,23 @@ std::vector<IdentifierHash> InternalOfflineMap::getAllCurrentHashes() const {
InternalConstItr
InternalOfflineMap::cend() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map.cend();
}
const std::vector < I_InternalIDC::hashPair >& InternalOfflineMap::getAllHashPtrPair() const{
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map;
}
InternalConstItr
InternalOfflineMap::cbegin() const {
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
return m_map.cbegin();
}
InternalConstItr InternalOfflineMap::indexFind( IdentifierHash hashId ) const{
if(m_needsupdate) wait();
if(m_needsupdate.load(std::memory_order_acquire)) wait();
auto itr = std::lower_bound( m_map.cbegin(), m_map.cend(), hashId.value(), [](const hashPair &lhs, IdentifierHash::value_type rhs) -> bool { return lhs.first < rhs; } );
if(itr!= m_map.cend() && itr->first==hashId) return itr;
return m_map.cend();
......@@ -81,7 +81,7 @@ void InternalOfflineMap::cleanUp(deleter_f* deleter) noexcept {
destructor(deleter);
m_map.clear();
m_fullMap.clear();
m_needsupdate.store(false, std::memory_order_relaxed);
m_needsupdate.store(false, std::memory_order_release);
}
bool InternalOfflineMap::insert(IdentifierHash hashId, const void* ptr) {
......@@ -124,7 +124,7 @@ StatusCode InternalOfflineMap::fetchOrCreate(const std::vector<IdentifierHash>&)
}
void InternalOfflineMap::destructor(deleter_f* deleter) noexcept {
if(!m_needsupdate) for(const auto& x : m_map) deleter(x.second);
if(!m_needsupdate.load(std::memory_order_acquire)) for(const auto& x : m_map) deleter(x.second);
else {
for(const auto &pair : m_fullMap) { deleter(pair.second); }
}
......
......@@ -16,25 +16,25 @@ InternalOnline::InternalOnline(EventContainers::IdentifiableCacheBase *cache) :
m_mask(cache->fullSize(), false), m_waitNeeded(false) {}
const std::vector < I_InternalIDC::hashPair >& InternalOnline::getAllHashPtrPair() const{
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
return m_map;
}
InternalConstItr
InternalOnline::cend() const {
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
return m_map.cend();
}
InternalConstItr
InternalOnline::cbegin() const {
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
return m_map.cbegin();
}
InternalConstItr InternalOnline::indexFind( IdentifierHash hashId ) const{
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
auto itr = std::lower_bound( m_map.begin(), m_map.end(), hashId.value(), [](hashPair &lhs, IdentifierHash::value_type rhs) -> bool { return lhs.first < rhs; } );
if(itr!= m_map.end() && itr->first==hashId) return itr;
return m_map.end();
......@@ -43,7 +43,7 @@ InternalConstItr InternalOnline::indexFind( IdentifierHash hashId ) const{
void InternalOnline::wait() const {
//lockguard to protect m_waitlist from multiple wait calls
std::scoped_lock lock (m_waitMutex);
if(m_waitNeeded == false) return;
if(m_waitNeeded.load(std::memory_order_acquire) == false) return;
using namespace EventContainers;
const void* ABORTstate = reinterpret_cast<const void*>(IdentifiableCacheBase::ABORTEDflag);
while(!m_waitlist.empty()) {
......@@ -56,9 +56,9 @@ void InternalOnline::wait() const {
}
m_map.clear();
for(size_t i =0;i<m_mask.size();i++){
if(m_mask[i]) m_map.emplace_back(i, m_cacheLink->m_vec[i].load());
if(m_mask[i]) m_map.emplace_back(i, m_cacheLink->m_vec[i].load(std::memory_order_relaxed));//acquire sync is done by m_waitNeeded
}
m_waitNeeded.store(false);
m_waitNeeded.store(false, std::memory_order_release);
}
bool InternalOnline::tryAddFromCache(IdentifierHash hashId, EventContainers::IDC_WriteHandleBase &lock) {
......@@ -87,7 +87,7 @@ bool InternalOnline::tryAddFromCache(IdentifierHash hashId)
}
std::vector<IdentifierHash> InternalOnline::getAllCurrentHashes() const {
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
std::vector<IdentifierHash> ids;
ids.reserve(m_map.size());
for(auto &x : m_map) {
......@@ -97,12 +97,12 @@ std::vector<IdentifierHash> InternalOnline::getAllCurrentHashes() const {
}
size_t InternalOnline::numberOfCollections() const {
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_acquire)) wait();
return m_map.size();
}
void InternalOnline::resetMask() {
if(m_waitNeeded) wait();
if(m_waitNeeded.load(std::memory_order_relaxed)) wait();
m_mask.assign(m_cacheLink->fullSize(), false);
m_map.clear();
m_waitNeeded.store(true, std::memory_order_relaxed);
......
......@@ -377,7 +377,6 @@ int ID_ContainerTest::execute(EventContainers::Mode){
std::cout<<" Number of Collections from Random Access "
<< nColl<< std::endl;
#ifdef IdentifiableCacheBaseRemove
// Test removal of collections
unsigned int collsRemoved = 0;
for (int coll =0; coll <hfmax; coll += 3) {
......@@ -397,7 +396,6 @@ int ID_ContainerTest::execute(EventContainers::Mode){
<< vCollRem.size() << " removed collections found again " << collsFound
<< " tested " << collsFound1
<< std::endl;
#endif
m_container->cleanup();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment