Skip to content
Snippets Groups Projects
Commit c99f9e2d authored by Edward Moyse's avatar Edward Moyse
Browse files

Merge branch 'AthenaSharedIO_ShortMultiStream' into 'master'

For ROOT-based SharedWriter, handle short jobs with multiple files.

See merge request !42663
parents d3812fb4 286d1d20
No related branches found
No related tags found
5 merge requests!58791DataQualityConfigurations: Modify L1Calo config for web display,!46784MuonCondInterface: Enable thread-safety checking.,!46776Updated LArMonitoring config file for WD to match new files produced using MT,!45405updated ART test cron job,!42663For ROOT-based SharedWriter, handle short jobs with multiple files.
......@@ -20,8 +20,12 @@
#include "TMonitor.h"
#include "TServerSocket.h"
#include "TSocket.h"
#include "TString.h"
#include "TTree.h"
#include <set>
#include <map>
/// Definiton of a branch descriptor from RootTreeContainer
struct BranchDesc {
public:
......@@ -190,7 +194,10 @@ StatusCode AthenaRootSharedWriterSvc::initialize() {
StatusCode AthenaRootSharedWriterSvc::share(int numClients) {
ATH_MSG_VERBOSE("Start commitOutput loop");
StatusCode sc = m_cnvSvc->commitOutput("", false);
while (m_rootClientCount > 0 || (m_rootClientIndex < numClients && (sc.isSuccess() || sc.isRecoverable()))) {
int workerCounter = 0;
std::set<int> rootClientSet;
std::map<TString, int> workerCount;
while (m_rootClientCount > 0 || (workerCounter < numClients && (sc.isSuccess() || sc.isRecoverable()))) {
if (sc.isSuccess()) {
ATH_MSG_VERBOSE("Success in commitOutput loop");
} else if (m_rootMonitor != nullptr) {
......@@ -228,6 +235,11 @@ StatusCode AthenaRootSharedWriterSvc::share(int numClients) {
message->ReadInt(clientId);
message->ReadTString(filename);
message->ReadLong64(length);
if (rootClientSet.find(clientId) == rootClientSet.end()) {
rootClientSet.insert(clientId);
workerCount[filename]++;
if (workerCount[filename] > workerCounter) workerCounter = workerCount[filename];
}
ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
message->SetBufferOffset(message->Length() + length);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment