Commit 98a57ca3 authored by Remi Mommsen's avatar Remi Mommsen
Browse files

references #90: a couple of performance improvements

parent ce90610d
......@@ -159,6 +159,7 @@ TestLibraryDirs = \
UserCCFlags = -O3 -funroll-loops -Werror
#UserCCFlags += -DEVB_DEBUG_CORRUPT_EVENT
#UserCCFlags += -DEVB_USE_LOCS_VECTOR
# These libraries can be platform specific and
# potentially need conditional processing
......
......@@ -7,6 +7,9 @@
#include <ctime>
#include <string>
#ifdef EVB_USE_LOCS_VECTOR
#include "evb/DataLocations.h"
#endif
namespace evb {
......@@ -28,6 +31,9 @@ namespace evb {
const time_t creationTime;
const std::string fileName;
std::vector<EventPtr> events;
#ifdef EVB_USE_LOCS_VECTOR
DataLocations dataLocations;
#endif
FileInfo(const std::string fileName)
: creationTime(time(0)),fileName(fileName) {};
......
......@@ -9,6 +9,7 @@
#include "cgicc/HTMLClasses.h"
#include "evb/OneToOneQueue.h"
#include "evb/bu/FileInfo.h"
#include "evb/bu/ResourceManager.h"
#include "toolbox/lang/Class.h"
#include "toolbox/task/WorkLoop.h"
......@@ -33,6 +34,7 @@ namespace evb {
(
BU*,
DiskWriter*,
std::shared_ptr<ResourceManager>,
const std::string& id
);
......@@ -73,6 +75,7 @@ namespace evb {
BU* bu_;
DiskWriter* diskWriter_;
std::shared_ptr<ResourceManager> resourceManager_;
const std::string id_;
toolbox::task::WorkLoop* writingWL_;
......
......@@ -65,10 +65,7 @@ void evb::bu::DiskWriter::handleEvent(EventPtr&& event)
bool evb::bu::DiskWriter::getNextFileInfo(FileInfoPtr& fileInfo)
{
if ( getReadyFileInfo(fileInfo) )
return true;
else
return populateFileInfo(fileInfo);
return ( getReadyFileInfo(fileInfo) || populateFileInfo(fileInfo) );
}
......@@ -101,10 +98,25 @@ bool evb::bu::DiskWriter::populateFileInfo(FileInfoPtr& fileInfo)
));
pos->second->events.reserve(configuration_->maxEventsPerFile);
pos->second->header.lumiSection = lumiSection;
#ifdef EVB_USE_LOCS_VECTOR
iovec fileHeader;
fileHeader.iov_base = &(pos->second->header);
fileHeader.iov_len = sizeof(FileHeader);
pos->second->dataLocations.emplace_back(std::move(fileHeader));
#endif
}
pos->second->header.eventCount++;
pos->second->header.fileSize += sizeof(EventInfo) + event->getEventInfo()->eventSize();
#ifdef EVB_USE_LOCS_VECTOR
iovec eventHeader;
eventHeader.iov_base = event->getEventInfo().get();
eventHeader.iov_len = sizeof(EventInfo);
pos->second->dataLocations.emplace_back(std::move(eventHeader));
const DataLocations& frags = event->getDataLocations();
pos->second->dataLocations.insert(pos->second->dataLocations.end(),frags.begin(),frags.end());
#endif
pos->second->events.emplace_back(std::move(event));
if ( pos->second->header.eventCount == configuration_->maxEventsPerFile )
......@@ -160,6 +172,7 @@ void evb::bu::DiskWriter::startProcessing(const uint32_t runNumber)
streamHandlers_.emplace_back(std::make_unique<StreamHandler>(
bu_,
this,
resourceManager_,
std::to_string(i)));
}
......@@ -424,10 +437,6 @@ bool evb::bu::DiskWriter::moveFiles()
while ( handler->getNextClosedFileInfo(fileInfo) )
{
workDone = true;
for (auto const& event : fileInfo->events)
resourceManager_->discardEvent(event);
handleRawDataFile(fileInfo);
}
}
......
......@@ -17,10 +17,12 @@ evb::bu::StreamHandler::StreamHandler
(
BU* bu,
DiskWriter* diskWriter,
std::shared_ptr<ResourceManager> resourceManager,
const std::string& id
) :
bu_(bu),
diskWriter_(diskWriter),
resourceManager_(resourceManager),
id_(id),
doProcessing_(true),
eventWriterActive_(false),
......@@ -159,6 +161,34 @@ void evb::bu::StreamHandler::writeEvents()
XCEPT_RAISE(exception::DiskWriting, msg.str());
}
#ifdef EVB_USE_LOCS_VECTOR
// writev accepts only IOV_MAX elements at the time
const uint32_t iov_max = sysconf(_SC_IOV_MAX);
const uint32_t elements = fileInfo->dataLocations.size();
uint32_t pos = 0;
uint64_t bytesWritten = 0;
while ( pos < elements )
{
const uint32_t length = std::min(elements-pos,iov_max);
const ssize_t result = writev(fileDescriptor,&fileInfo->dataLocations[pos],length);
if ( result == -1 )
{
std::ostringstream msg;
msg << "writev failed to write into " << fileInfo->fileName
<< ": " << strerror(errno);
XCEPT_RAISE(exception::DiskWriting, msg.str());
}
bytesWritten += static_cast<uint64_t>(result);
pos += length;
}
#else
ssize_t result = write(fileDescriptor,&(fileInfo->header),sizeof(FileHeader));
if ( result == -1 )
......@@ -199,47 +229,7 @@ void evb::bu::StreamHandler::writeEvents()
bytesWritten += static_cast<uint64_t>(result);
}
// DataLocations locs;
// iovec fileHeader;
// fileHeader.iov_base = &(fileInfo->header);
// fileHeader.iov_len = sizeof(FileHeader);
// locs.emplace_back(std::move(fileHeader));
// for (auto const& event : fileInfo->events)
// {
// iovec eventHeader;
// eventHeader.iov_base = event->getEventInfo().get();
// eventHeader.iov_len = sizeof(EventInfo);
// locs.emplace_back(std::move(eventHeader));
// const DataLocations& frags = event->getDataLocations();
// locs.insert(locs.end(),frags.begin(),frags.end());
// }
// // writev accepts only IOV_MAX elements at the time
// const uint32_t iov_max = sysconf(_SC_IOV_MAX);
// const uint32_t elements = locs.size();
// uint32_t pos = 0;
// uint64_t bytesWritten = 0;
// while ( pos < elements )
// {
// const uint32_t length = std::min(elements-pos,iov_max);
// const ssize_t result = writev(fileDescriptor,&locs[pos],length);
// if ( result == -1 )
// {
// std::ostringstream msg;
// msg << "writev failed to write into " << fileInfo->fileName
// << ": " << strerror(errno);
// XCEPT_RAISE(exception::DiskWriting, msg.str());
// }
// bytesWritten += static_cast<uint64_t>(result);
// pos += length;
// }
#endif
if ( bytesWritten != fileInfo->header.fileSize )
{
......@@ -260,6 +250,10 @@ void evb::bu::StreamHandler::writeEvents()
eventCount_ += fileInfo->header.eventCount;
bytesWritten_ += fileInfo->header.fileSize;
for (auto const& event : fileInfo->events)
resourceManager_->discardEvent(event);
fileInfo->events.clear();
closedFileInfoFIFO_.enqWait(std::move(fileInfo));
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment