From 158620e9e84f8b8dd9c912da9816c541fb90c2c5 Mon Sep 17 00:00:00 2001 From: meschi <emilio.meschi@cern.ch> Date: Thu, 3 Nov 2022 17:10:14 +0100 Subject: [PATCH] use preallocated buffers for output --- src/OutputByOrbit.cc | 11 ++++++++--- src/OutputByOrbit.h | 1 + src/OutputFileHandler.cc | 10 ++++++++++ src/processor.cc | 9 +++++---- src/processor.h | 3 ++- src/scdaq.cc | 3 +-- src/slice.cc | 2 ++ src/slice.h | 1 + 8 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/OutputByOrbit.cc b/src/OutputByOrbit.cc index 02d046e5..f616bd3c 100644 --- a/src/OutputByOrbit.cc +++ b/src/OutputByOrbit.cc @@ -14,7 +14,8 @@ OutputByOrbitStream::OutputByOrbitStream(ctrl &c, config &conf_) : tbb::filter(serial_in_order), current_file_size(0), totcounts(0), control(c), conf(conf_), output_file_handler_(conf.getOutputFilenameBase(), - conf.getOutputFilenamePrefix()) { + conf.getOutputFilenamePrefix()), + min_buffer_queue_size_(1000) { LOG(TRACE) << "Created output filter at " << static_cast<void *>(this); } @@ -49,7 +50,11 @@ void *OutputByOrbitStream::operator()(void *item) { Slice &out = *static_cast<Slice *>(item); totcounts += out.get_counts(); OutputFixedOrbits(out); - - out.free(); + if (Slice::current_queue_size() < min_buffer_queue_size_) { + LOG(TRACE) << "New minimum queue size is " << Slice::current_queue_size() + << " total counts " << totcounts; + min_buffer_queue_size_ = Slice::current_queue_size(); + } + Slice::giveAllocated(&out); return NULL; } diff --git a/src/OutputByOrbit.h b/src/OutputByOrbit.h index a3135e6c..0801fc83 100644 --- a/src/OutputByOrbit.h +++ b/src/OutputByOrbit.h @@ -26,6 +26,7 @@ private: ctrl &control; config &conf; OutputFileHandler output_file_handler_; + size_t min_buffer_queue_size_; }; #endif diff --git a/src/OutputFileHandler.cc b/src/OutputFileHandler.cc index ac28fd0b..f792c33e 100644 --- a/src/OutputFileHandler.cc +++ b/src/OutputFileHandler.cc @@ -2,7 +2,12 @@ #include "OutputFileHandler.h" #include "log.h" #include "tools.h" +#include <cstdio> +#include <fcntl.h> #include <iomanip> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> const std::string OutputFileHandler::working_dir_ = "in_progress"; const std::string OutputFileHandler::journal_file_ = "index.journal"; @@ -100,6 +105,11 @@ void OutputFileHandler::close_and_rename::operator()() const { } LOG(TRACE) << "popping file: " << n.first << " for closing, queue size now " << files_to_close_.size(); + struct stat buf; + fstat(fileno(n.second), &buf); + if (posix_fadvise(fileno(n.second), 0, buf.st_size, POSIX_FADV_DONTNEED) != + 0) + LOG(ERROR) << tools::strerror("Page cache release failed"); if (fclose(n.second) != 0) { LOG(ERROR) << tools::strerror("File close failed"); } diff --git a/src/processor.cc b/src/processor.cc index cea7a2d8..845342b4 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -14,11 +14,11 @@ StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_, ctrl &control_) - : tbb::filter(parallel), max_size(max_size_), nbPackets(0), doZS(doZS_), + : tbb::filter(parallel), next_slice_(Slice::getAllocated()), + max_size(max_size_), nbPackets(0), doZS(doZS_), processorType(processorType_), nOrbitsPerDMAPacket(nOrbitsPerDMAPacket_), prescaleFactor(prescaleFactor_), control(control_) { LOG(TRACE) << "Created transform filter at " << static_cast<void *>(this); - myfile.open("example.txt"); } BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + @@ -38,7 +38,7 @@ void bit_check(std::vector<unsigned int> *bx_vect, uint32_t word, return; } -StreamProcessor::~StreamProcessor() { myfile.close(); } +StreamProcessor::~StreamProcessor() { Slice::giveAllocated(next_slice_); } // checks that the packet size is an integer multiple of the BX block size, // minus the header/trailers @@ -467,8 +467,9 @@ void StreamProcessor::process(Slice &input, Slice &out) { } void *StreamProcessor::operator()(void *item) { Slice &input = *static_cast<Slice *>(item); - Slice &out = *Slice::allocate(2 * max_size); + Slice &out = *next_slice_; process(input, out); Slice::giveAllocated(&input); + next_slice_ = Slice::getAllocated(); return &out; } diff --git a/src/processor.h b/src/processor.h index 34b75bfc..12bbdd54 100644 --- a/src/processor.h +++ b/src/processor.h @@ -42,7 +42,8 @@ private: fillOrbitMetadata FillOrbitCalo(std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr); uint32_t FillBril(char *rd_ptr, char *wr_ptr, char *end_ptr); - std::ofstream myfile; + Slice *next_slice_; + ; size_t max_size; uint64_t nbPackets; bool doZS; diff --git a/src/scdaq.cc b/src/scdaq.cc index c020490b..90e8f510 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -179,6 +179,7 @@ int main(int argc, char *argv[]) { int nbThreads = conf.getNumThreads(); retval = 0; + try { tbb::task_scheduler_init init(nbThreads); if (!run_pipeline(nbThreads, control, conf)) { @@ -200,7 +201,6 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Successfully reset the board."; } } - LOG(DEBUG) << "Terminating the internal TCP server."; io_service.stop(); if (!t.try_join_for(boost::chrono::milliseconds(2000))) { @@ -209,7 +209,6 @@ int main(int argc, char *argv[]) { } else { LOG(DEBUG) << "Internal TCP server is terminated."; } - // utility::report_elapsed_time((tbb::tick_count::now() - // mainStartTime).seconds()); return retval; diff --git a/src/slice.cc b/src/slice.cc index 0fb94ce1..b10cf601 100644 --- a/src/slice.cc +++ b/src/slice.cc @@ -37,3 +37,5 @@ void Slice::giveAllocated(Slice *t) { t->set_end(t->begin()); Slice::free_slices.push(t); } + +size_t Slice::current_queue_size() { return free_slices.size(); } diff --git a/src/slice.h b/src/slice.h index 656c8e4b..72770a9c 100644 --- a/src/slice.h +++ b/src/slice.h @@ -37,6 +37,7 @@ public: static void shutDown(); static Slice *getAllocated(); static void giveAllocated(Slice *); + static size_t current_queue_size(); Slice *clone() const { return new Slice(*this); } //! Free a Slice object -- GitLab