From 2de5481b4a7769395fa32b1d667f2fbfa7efdf2d Mon Sep 17 00:00:00 2001 From: Petr Zejdl <petr.zejdl@cern.ch> Date: Wed, 31 Oct 2018 14:12:52 +0100 Subject: [PATCH] Simplified and unified data input, moved to camelCase --- src/FileDmaInputFilter.cc | 37 ++++++- src/FileDmaInputFilter.h | 9 +- src/InputFilter.cc | 71 ++++++------- src/InputFilter.h | 23 ++-- src/Makefile | 7 +- src/WZDmaInputFilter.cc | 166 +++++++++++++++++++++++++++++ src/WZDmaInputFilter.h | 40 +++++++ src/scdaq.cc | 29 ++--- src/scdaq.conf | 8 +- src/tools.h | 53 ++++++++++ src/wzdma_input.cc | 217 -------------------------------------- src/wzdma_input.h | 37 ------- 12 files changed, 359 insertions(+), 338 deletions(-) create mode 100644 src/WZDmaInputFilter.cc create mode 100644 src/WZDmaInputFilter.h create mode 100644 src/tools.h delete mode 100644 src/wzdma_input.cc delete mode 100644 src/wzdma_input.h diff --git a/src/FileDmaInputFilter.cc b/src/FileDmaInputFilter.cc index 8e1e7606..a30933cb 100644 --- a/src/FileDmaInputFilter.cc +++ b/src/FileDmaInputFilter.cc @@ -84,8 +84,43 @@ static inline ssize_t read_dma_packet_from_file(FILE *inputFile, char *buffer, u } +inline ssize_t FileDmaInputFilter::readPacket(char **buffer, size_t bufferSize) +{ + // Read from DMA + int skip = 0; + ssize_t bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads() ); + + // If large packet returned, skip and read again + while ( bytesRead > (ssize_t)bufferSize ) { + stats.nbOversizedPackets++; + skip++; + std::cerr + << "#" << nbReads() << ": ERROR: Read returned " << bytesRead << " > buffer size " << bufferSize + << ". Skipping packet #" << skip << ".\n"; + if (skip >= 100) { + throw std::runtime_error("FATAL: Read is still returning large packets."); + } + bytesRead = read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads() ); + } + + return bytesRead; +} + + +/************************************************************************** + * Entry points are here + * Overriding virtual functions + */ + +// Print some additional info +void FileDmaInputFilter::print(std::ostream& out) const +{ + out << ", oversized packets " << stats.nbOversizedPackets; +} + ssize_t FileDmaInputFilter::readInput(char **buffer, size_t bufferSize) { - return read_dma_packet_from_file(inputFile, *buffer, bufferSize, nbReads() ); + return readPacket( buffer, bufferSize ); } + diff --git a/src/FileDmaInputFilter.h b/src/FileDmaInputFilter.h index 18395e1f..db01ead4 100644 --- a/src/FileDmaInputFilter.h +++ b/src/FileDmaInputFilter.h @@ -16,12 +16,17 @@ public: protected: ssize_t readInput(char **buffer, size_t bufferSize); // Override + void print(std::ostream& out) const; // Override - // When reading from file this method does nothing - void readComplete(char *buffer) { (void)(buffer); } +private: + ssize_t readPacket(char **buffer, size_t bufferSize); private: FILE* inputFile; + + struct Statistics { + uint64_t nbOversizedPackets = 0; + } stats; }; typedef std::shared_ptr<FileDmaInputFilter> FileDmaInputFilterPtr; diff --git a/src/InputFilter.cc b/src/InputFilter.cc index 26e4d975..82923d10 100644 --- a/src/InputFilter.cc +++ b/src/InputFilter.cc @@ -1,5 +1,6 @@ #include <cassert> #include <iostream> +#include <iomanip> #include <system_error> #include "InputFilter.h" @@ -12,8 +13,6 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl& nextSlice_(Slice::preAllocate( packetBufferSize, nbPacketBuffers )), nbReads_(0), nbBytesRead_(0), - nbErrors_(0), - nbOversized_(0), previousNbBytesRead_(0), previousStartTime_( tbb::tick_count::now() ) { @@ -26,29 +25,37 @@ InputFilter::~InputFilter() { Slice::giveAllocated(nextSlice_); std::cerr << "Input operator performed " << nbReads_ << " read\n"; } - -inline ssize_t InputFilter::readHelper(char **buffer, size_t bufferSize) + + +void InputFilter::printStats(std::ostream& out) { - // Read from DMA - int skip = 0; - ssize_t bytesRead = readInput( buffer, bufferSize ); - - // If large packet returned, skip and read again - while ( bytesRead > (ssize_t)bufferSize ) { - nbOversized_++; - skip++; - std::cerr - << "#" << nbReads_ << ": ERROR: Read returned " << bytesRead << " > buffer size " << bufferSize - << ". Skipping packet #" << skip << ".\n"; - if (skip >= 100) { - throw std::runtime_error("FATAL: Read is still returning large packets."); - } - bytesRead = readInput( buffer, bufferSize ); - } + // Calculate DMA bandwidth + tbb::tick_count now = tbb::tick_count::now(); + double time_diff = (double)((now - previousStartTime_).seconds()); + previousStartTime_ = now; + + uint64_t nbBytesReadDiff = nbBytesRead_ - previousNbBytesRead_; + previousNbBytesRead_ = nbBytesRead_; + + double bwd = nbBytesReadDiff / ( time_diff * 1024.0 * 1024.0 ); + + // Save formatting + std::ios state(nullptr); + state.copyfmt(out); + + out + << "#" << nbReads_ << ": Read(s) returned: " << nbBytesReadDiff + << ", bandwidth " << std::fixed << std::setprecision(1) << bwd << " MBytes/sec"; - return bytesRead; + // Restore formatting + out.copyfmt(state); + + // Print additional info + print( out ); + out << '\n'; } + void* InputFilter::operator()(void*) { // Prepare destination buffer char *buffer = nextSlice_->begin(); @@ -56,13 +63,10 @@ void* InputFilter::operator()(void*) { size_t bufferSize = nextSlice_->avail(); ssize_t bytesRead = 0; - // We need at least 1MB buffer - assert( bufferSize >= 1024*1024 ); - nbReads_++; // It is optional to use the provided buffer - bytesRead = readHelper( &buffer, bufferSize ); + bytesRead = readInput( &buffer, bufferSize ); // This should really not happen assert( bytesRead != 0); @@ -78,22 +82,9 @@ void* InputFilter::operator()(void*) { nbBytesRead_ += bytesRead; - // TODO: Make this configurable + // Calculate some statistics if (control_.packets_per_report && (nbReads_ % control_.packets_per_report == 0)) { - // Calculate DMA bandwidth - tbb::tick_count now = tbb::tick_count::now(); - double time_diff = (double)((now - previousStartTime_).seconds()); - previousStartTime_ = now; - - uint64_t nbBytesReadDiff = nbBytesRead_ - previousNbBytesRead_; - previousNbBytesRead_ = nbBytesRead_; - - double bwd = nbBytesReadDiff / ( time_diff * 1024.0 * 1024.0 ); - - std::cout - << "#" << nbReads_ << ": Read(s) returned: " << nbBytesReadDiff - << ", bandwidth " << bwd << "MBytes/sec, read errors " << nbErrors_ - << ", read returned oversized packet " << nbOversized_ << ".\n"; + printStats( std::cout ); } // Have more data to process. diff --git a/src/InputFilter.h b/src/InputFilter.h index e0a21ed8..763feae6 100644 --- a/src/InputFilter.h +++ b/src/InputFilter.h @@ -2,6 +2,7 @@ #define INPUT_FILTER_H #include <cstddef> +#include <iostream> #include "tbb/pipeline.h" #include "tbb/tick_count.h" @@ -16,10 +17,10 @@ class Slice; */ class InputFilter: public tbb::filter { public: - //InputFilter( FILE*, size_t, size_t); InputFilter(size_t packet_buffer_size, size_t number_of_packet_buffers, ctrl& control); virtual ~InputFilter(); + // Return the number of read calls uint64_t nbReads() { return nbReads_; } protected: @@ -27,12 +28,18 @@ protected: virtual ssize_t readInput(char **buffer, size_t bufferSize) = 0; // Notify the read that the buffer returned by readInput is processed (can be freed or reused) - virtual void readComplete(char *buffer) = 0; + // The default implementation does nothing, since this function is required only in special cases like e.g. zero copy read + virtual void readComplete(char *buffer) { (void)(buffer); } + + // Allow overridden function to print some additional info + virtual void print(std::ostream& out) const = 0; private: - void* operator()(void* item); // Override + void* operator()(void* item); - inline ssize_t readHelper(char **buffer, size_t buffer_size); + // NOTE: This can be moved out of this class into a separate one + // and run in a single thread in order to do reporting... + void printStats(std::ostream& out); private: ctrl& control_; @@ -46,13 +53,7 @@ private: // Number of byted read uint64_t nbBytesRead_; - // Number of read errors detected - uint64_t nbErrors_; - - // Number of oversized packets returned () - uint64_t nbOversized_; - - // Performance monitoring + // For Performance monitoring // Snapshot of nbBytesRead_ uint64_t previousNbBytesRead_; diff --git a/src/Makefile b/src/Makefile index 45fb35ea..6dea3d2d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -12,7 +12,7 @@ TARGET = scdaq # source files -SOURCES = config.cc dma_input.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc wzdma_input.cc +SOURCES = config.cc dma_input.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc C_SOURCES = wz_dma.c # work out names of object files from sources @@ -62,13 +62,12 @@ config.o: config.h dma_input.o: dma_input.h slice.h elastico.o: elastico.h format.h slice.h controls.h file_dma_input.o: file_dma_input.h -FileDmaInputFilter.o: FileDmaInputFilter.h +FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h file_input.o: file_input.h slice.h utility.h InputFilter.o: InputFilter.h output.o: output.h slice.h processor.o: processor.h slice.h format.h session.o: session.h slice.o: slice.h -wzdma_input.o: wzdma_input.h slice.h +WZDmaInputFilter.o: WZDmaInputFilter.h InputFilter.h tools.h wz_dma.o: wz_dma.h - diff --git a/src/WZDmaInputFilter.cc b/src/WZDmaInputFilter.cc new file mode 100644 index 00000000..a7e79dda --- /dev/null +++ b/src/WZDmaInputFilter.cc @@ -0,0 +1,166 @@ +#include <cassert> +#include <cerrno> +#include <system_error> +#include <iostream> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "tools.h" +#include "WZDmaInputFilter.h" + + +WZDmaInputFilter::WZDmaInputFilter( size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ) : + InputFilter( packetBufferSize, nbPacketBuffers, control ) +{ + // Initialize the DMA subsystem + if ( wz_init( &dma_ ) < 0 ) { + throw std::system_error(errno, std::system_category(), "Cannot initialize WZ DMA device"); + } + + // Start the DMA + if ( wz_start_dma( &dma_ ) < 0) { + throw std::system_error(errno, std::system_category(), "Cannot start WZ DMA"); + } + + std::cerr << "Created WZ DMA input filter\n"; +} + +WZDmaInputFilter::~WZDmaInputFilter() { + wz_stop_dma( &dma_ ); + wz_close( &dma_ ); + std::cerr << "Destroyed WZ DMA input filter\n"; +} + + +inline ssize_t WZDmaInputFilter::read_packet_from_dma(char **buffer) +{ + int tries = 1; + ssize_t bytes_read; + + while (1) { + bytes_read = wz_read_start( &dma_, buffer ); + + if (bytes_read < 0) { + stats.nbDmaErrors++; + tools::perror("#" + std::to_string( nbReads() ) + ": Read failed"); + + if (errno == EIO) { + std::cerr << "#" << nbReads() << ": Trying to restart DMA (attempt #" << tries << "): "; + wz_stop_dma( &dma_ ); + wz_close( &dma_ ); + + // Initialize the DMA subsystem + if ( wz_init( &dma_ ) < 0 ) { + throw std::system_error(errno, std::system_category(), "Cannot initialize WZ DMA device"); + } + + if (wz_start_dma( &dma_ ) < 0) { + throw std::system_error(errno, std::system_category(), "Cannot start WZ DMA device"); + } + + std::cerr << "Success.\n"; + tries++; + + if (tries == 10) { + throw std::runtime_error("FATAL: Did not manage to restart DMA."); + } + continue; + } + throw std::system_error(errno, std::system_category(), "FATAL: Unrecoverable DMA error detected."); + } + break; + } + + // Should not happen + assert(bytes_read > 0); + + return bytes_read; +} + + +inline ssize_t WZDmaInputFilter::read_packet( char **buffer, size_t bufferSize ) +{ + ssize_t bytesRead; + + // Read from DMA + int skip = 0; + int reset = 0; + bytesRead = read_packet_from_dma( buffer ); + + // If large packet returned, skip and read again + while ( bytesRead > (ssize_t)bufferSize ) { + stats.nbDmaOversizedPackets++; + skip++; + std::cerr + << "#" << nbReads() << ": ERROR: DMA read returned " << bytesRead << " > buffer size " << bufferSize + << ". Skipping packet #" << skip << ".\n"; + if (skip > 100) { + reset++; + stats.nbBoardResets++; + + if (reset > 10) { + std::cerr << "Resets didn't help!\n"; + throw std::runtime_error("FATAL: DMA is still returning large packets."); + } + + // Oversized packet is usually sign of link problem + // Let's try to reset the board + std::cerr << "Goging to reset the board: \n"; + if (wz_reset_board() < 0) { + std::cerr << "Reset finished\n"; + } else { + std::cerr << "Reset succesfull\n"; + } + + std::cerr << "Waiting for 30 seconds to clear any collected crap: "; + // Sleep for 30 seconds (TCDS may be paused) + for (int i=0; i<30; i++) { + std::cout << '.' << std::flush; + usleep(1000000); + } + std::cerr << " OK\n"; + } + bytesRead = read_packet_from_dma( buffer ); + } + + return bytesRead; +} + + +/************************************************************************** + * Entry points are here + * Overriding virtual functions + */ + + +// Print some additional info +void WZDmaInputFilter::print(std::ostream& out) const +{ + out + << ", DMA errors " << stats.nbDmaErrors + << ", DMA oversized packets " << stats.nbDmaOversizedPackets + << ", board resets " << stats.nbBoardResets; +} + + +// Read a packet from DMA +ssize_t WZDmaInputFilter::readInput(char **buffer, size_t bufferSize) +{ + // We need at least 1MB buffer + assert( bufferSize >= 1024*1024 ); + + return read_packet( buffer, bufferSize ); +} + + +// Notifi the DMA that packet was processed +void WZDmaInputFilter::readComplete(char *buffer) { + (void)(buffer); + + // Free the DMA buffer + if ( wz_read_complete( &dma_ ) < 0 ) { + throw std::system_error(errno, std::system_category(), "Cannot complete WZ DMA read"); + } +} \ No newline at end of file diff --git a/src/WZDmaInputFilter.h b/src/WZDmaInputFilter.h new file mode 100644 index 00000000..1460702c --- /dev/null +++ b/src/WZDmaInputFilter.h @@ -0,0 +1,40 @@ +#ifndef WZDMA_INPUT_FILTER_H +#define WZDMA_INPUT_FILTER_H + +#include <iostream> +#include <memory> + +#include "tbb/pipeline.h" +#include "tbb/tick_count.h" + +#include "InputFilter.h" +#include "wz_dma.h" + + +class WZDmaInputFilter: public InputFilter { + public: + WZDmaInputFilter( size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ); + virtual ~WZDmaInputFilter(); + +protected: + ssize_t readInput(char **buffer, size_t bufferSize); // Override + void readComplete(char *buffer); // Override + void print(std::ostream& out) const; // Override + +private: + ssize_t read_packet_from_dma(char **buffer); + ssize_t read_packet( char **buffer, size_t bufferSize ); + + struct Statistics { + uint64_t nbDmaErrors = 0; + uint64_t nbDmaOversizedPackets = 0; + uint64_t nbBoardResets = 0; + } stats; + + struct wz_private dma_; +}; + +typedef std::shared_ptr<WZDmaInputFilter> WZDmaInputFilterPtr; + + +#endif // WZDMA_INPUT_FILTER_H diff --git a/src/scdaq.cc b/src/scdaq.cc index 8b5f195a..dd7a5b59 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -17,7 +17,7 @@ #include "InputFilter.h" #include "FileDmaInputFilter.h" -#include "wzdma_input.h" +#include "WZDmaInputFilter.h" #include "dma_input.h" #include "file_input.h" #include "processor.h" @@ -42,8 +42,8 @@ int run_pipeline( int nthreads, ctrl& control, config *conf) { config::InputType input = conf->getInput(); - size_t MAX_BYTES_PER_INPUT_SLICE = 0; - size_t TOTAL_SLICES = 0; + size_t MAX_BYTES_PER_INPUT_SLICE = conf->getDmaPacketBufferSize(); + size_t TOTAL_SLICES = conf->getNumberOfDmaPacketBuffers(); // Create empty input reader, will assing later when we know what is the data source std::shared_ptr<InputFilter> input_filter; @@ -52,40 +52,25 @@ int run_pipeline( int nthreads, ctrl& control, config *conf) tbb::pipeline pipeline; if (input == config::InputType::FILE) { - // Prepare reading from FILE + // Create file-reading writing stage and add it to the pipeline MAX_BYTES_PER_INPUT_SLICE = 192*conf->getBlocksPerInputBuffer(); TOTAL_SLICES = conf->getNumInputBuffers(); - // Create file-reading writing stage and add it to the pipeline //input_filter = std::make_shared<FileInputFilter>( conf->getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input FILE is not supported"); + throw std::runtime_error("input type FILE is temporarily not supported"); } else if (input == config::InputType::DMA) { - // Prepare reading from DMA - MAX_BYTES_PER_INPUT_SLICE = conf->getDmaPacketBufferSize(); - TOTAL_SLICES = conf->getNumberOfDmaPacketBuffers(); - // Create DMA reader //input_filter = std::make_shared<DmaInputFilter>( conf->getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input DMA is not supported"); - + throw std::runtime_error("input type DMA is temporarily not supported"); } else if (input == config::InputType::FILEDMA) { - // Prepare reading from FILE and simulating DMA - MAX_BYTES_PER_INPUT_SLICE = conf->getDmaPacketBufferSize(); - TOTAL_SLICES = conf->getNumberOfDmaPacketBuffers(); - // Create FILE DMA reader input_filter = std::make_shared<FileDmaInputFilter>( conf->getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); } else if (input == config::InputType::WZDMA ) { - // Prepare reading from WZ DMA - MAX_BYTES_PER_INPUT_SLICE = conf->getDmaPacketBufferSize(); - TOTAL_SLICES = conf->getNumberOfDmaPacketBuffers(); - // Create WZ DMA reader - //input_filter = std::make_shared<WZDmaInputFilter>( MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, &control ); - throw std::runtime_error("input WZDMA is not supported"); + input_filter = std::make_shared<WZDmaInputFilter>( MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); } else { throw std::invalid_argument("Configuration error: Unknown input type was specified"); diff --git a/src/scdaq.conf b/src/scdaq.conf index 613161e9..180986f8 100644 --- a/src/scdaq.conf +++ b/src/scdaq.conf @@ -3,8 +3,8 @@ # "dma" for XILINX DMA driver # "filedma" for reading from file and simulating DMA # "file" for reading from file -#input:wzdma -input:filedma +input:wzdma +#input:filedma ## Settings for DMA input @@ -19,8 +19,8 @@ dma_packet_buffer_size:1048576 dma_number_of_packet_buffers:1000 # Print report each N packets, use 0 to disable -#packets_per_report:5000 -packets_per_report:1 +packets_per_report:5000 +#packets_per_report:10 ## Settings for file input #input_file:/dev/shm/testdata.bin diff --git a/src/tools.h b/src/tools.h new file mode 100644 index 00000000..c170d1e8 --- /dev/null +++ b/src/tools.h @@ -0,0 +1,53 @@ +#ifndef TOOLS_H +#define TOOLS_H + +/* + * Some useful functions + */ + +#include <cstring> +#include <iostream> + +namespace tools { + + +/* + * A thread-safe version of strerror: Returns a C++ std::string describing error_code + */ +const std::string strerror(int error_code) +{ + char local_buf[128]; + char *buf = local_buf; + // NOTE: The returned buf may not be the same as the original buf + buf = strerror_r(error_code, buf, sizeof(local_buf)); + // Make a proper C++ string out of the returned buffer + std::string str(buf); + return str; +} + +/* + * A thread-safe version of strerror: Returns a C++ std::string describing ERRNO + */ +const std::string strerror() +{ + return tools::strerror(errno); +} + +const std::string strerror(const std::string& msg) +{ + return msg + ": " + tools::strerror(); +} + +/* + * A thread-safe version of perror + */ +void perror(const std::string& msg) +{ + std::cerr << tools::strerror(msg) << std::endl; +} + + +} // namespace tools + + +#endif // TOOLS_H diff --git a/src/wzdma_input.cc b/src/wzdma_input.cc deleted file mode 100644 index 69fd1d88..00000000 --- a/src/wzdma_input.cc +++ /dev/null @@ -1,217 +0,0 @@ -#include <cassert> -#include <cstdio> -#include <cerrno> -#include <system_error> -#include <iostream> - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include "wzdma_input.h" -#include "slice.h" - - -WZDmaInputFilter::WZDmaInputFilter(size_t packet_buffer_size_, size_t number_of_packet_buffers_, ctrl* control_) : - - filter(serial_in_order), - next_slice(Slice::preAllocate( packet_buffer_size_, number_of_packet_buffers_) ), - counts(0), - ncalls(0), - lastStartTime(tbb::tick_count::now()), - last_count(0), - dma_errors(0), - dma_oversized(0), - board_resets(0), - control(control_) -{ - // Initialize the DMA subsystem - if ( wz_init( &dma ) < 0 ) { - throw std::system_error(errno, std::system_category(), "Cannot initialize WZ DMA device"); - } - - // Start the DMA - if ( wz_start_dma( &dma ) < 0) { - throw std::system_error(errno, std::system_category(), "Cannot start WZ DMA"); - } - - fprintf(stderr,"Created input wzdma filter and allocated at 0x%llx \n",(unsigned long long)next_slice); -} - -WZDmaInputFilter::~WZDmaInputFilter() { - fprintf(stderr,"Destroy input dma filter and delete at 0x%llx \n",(unsigned long long)next_slice); - Slice::giveAllocated(next_slice); - fprintf(stderr,"input operator total %lu read \n", counts); - - wz_stop_dma( &dma ); - wz_close( &dma ); -} - - -namespace tools { -// Returns a C++ std::string describing ERRNO -const std::string strerror(int error_code) -{ - // TODO: Prealocat std::String and use directly its buffer - char local_buf[128]; - char *buf = local_buf; - buf = strerror_r(error_code, buf, sizeof(local_buf)); - std::string str(buf); - return str; -} - -const std::string strerror() -{ - return tools::strerror(errno); -} - -const std::string strerror(const std::string& msg) -{ - return msg + ": " + tools::strerror(); -} - -void perror(const std::string& msg) -{ - std::cerr << tools::strerror(msg) << std::endl; -} -} // namespace tools - - -inline ssize_t WZDmaInputFilter::read_packet(char **buffer) -{ - int tries = 1; - ssize_t bytes_read; - - while (1) { - bytes_read = wz_read_start( &dma, buffer ); - - if (bytes_read < 0) { - dma_errors++; - tools::perror("#" + std::to_string(ncalls) + ": Read failed"); - - if (errno == EIO) { - std::cerr << "#" << ncalls << ": Trying to restart DMA (attempt #" << tries << "): "; - wz_stop_dma( &dma ); - wz_close( &dma ); - - // Initialize the DMA subsystem - if ( wz_init( &dma ) < 0 ) { - throw std::system_error(errno, std::system_category(), "Cannot initialize WZ DMA device"); - } - - if (wz_start_dma( &dma ) < 0) { - throw std::system_error(errno, std::system_category(), "Cannot start WZ DMA device"); - } - - std::cerr << "Success.\n"; - tries++; - - if (tries == 10) { - throw std::runtime_error("FATAL: Did not manage to restart DMA."); - } - continue; - } - throw std::system_error(errno, std::system_category(), "FATAL: Unrecoverable DMA error detected."); - } - break; - } - - // Should not happen - assert(bytes_read > 0); - - return bytes_read; -} - - -void* WZDmaInputFilter::operator()(void*) { - size_t buffer_size = next_slice->avail(); - ssize_t bytes_read = 0; - - char *buffer = NULL; - - // We need at least 1MB buffer - assert( buffer_size >= 1024*1024 ); - - ncalls++; - - // Read from DMA - int skip = 0; - int reset = 0; - bytes_read = read_packet( &buffer ); - - // If large packet returned, skip and read again - while ( bytes_read > (ssize_t)buffer_size ) { - dma_oversized++; - skip++; - std::cerr - << "#" << ncalls << ": ERROR: DMA read returned " << bytes_read << " > buffer size " << buffer_size - << ". Skipping packet #" << skip << ".\n"; - if (skip > 100) { - reset++; - board_resets++; - - if (reset > 10) { - std::cerr << "Resets didn't help!\n"; - throw std::runtime_error("FATAL: DMA is still returning large packets."); - } - - // Oversized packet is usually sign of link problem - // Let's try to reset the board - std::cerr << "Goging to reset the board: \n"; - if (wz_reset_board() < 0) { - std::cerr << "Reset finished\n"; - } else { - std::cerr << "Reset succesfull\n"; - } - - std::cerr << "Waiting for 30 seconds to clear any collected crap: "; - // Sleep for 30 seconds (TCDS may be paused) - for (int i=0; i<30; i++) { - std::cout << '.'; - usleep(1000000); - } - std::cerr << " OK\n"; - } - bytes_read = read_packet( &buffer ); - } - - // This should really not happen - assert( bytes_read != 0); - - // We copy data from the DMA buffer - // NOTE: This is stupid, we should use zero copy approach - memcpy( next_slice->begin(), buffer, bytes_read ); - - // Free DMA buffer - if ( wz_read_complete( &dma ) < 0 ) { - throw std::system_error(errno, std::system_category(), "Cannot complete WZ DMA read"); - } - - counts += bytes_read; - - // TODO: Make this configurable - if (control->packets_per_report && (ncalls % control->packets_per_report == 0)) { - // Calculate DMA bandwidth - tbb::tick_count now = tbb::tick_count::now(); - double time_diff = (double)((now - lastStartTime).seconds()); - lastStartTime = now; - double bwd = counts / ( time_diff * 1024.0 * 1024.0 ); - - std::cout - << "#" << ncalls << ": Read(s) returned: " << counts - << ", DMA bandwidth " << bwd << "MBytes/sec, DMA errors " << dma_errors - << ", DMA oversized packets " << dma_oversized - << ", board resets " << board_resets << ".\n"; - counts = 0; - } - - // Have more data to process. - Slice* this_slice = next_slice; - next_slice = Slice::getAllocated(); - - // Adjust the end of this buffer - this_slice->set_end( this_slice->end() + bytes_read ); - - return this_slice; - -} diff --git a/src/wzdma_input.h b/src/wzdma_input.h deleted file mode 100644 index f9d4615c..00000000 --- a/src/wzdma_input.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef WZDMA_INPUT_H -#define WZDMA_INPUT_H - -#include <memory> -#include <string> - -#include "tbb/pipeline.h" -#include "tbb/tick_count.h" - -#include "controls.h" -#include "wz_dma.h" - -class Slice; - -class WZDmaInputFilter: public tbb::filter { - public: - WZDmaInputFilter(size_t, size_t, ctrl*); - ~WZDmaInputFilter(); - - private: - ssize_t read_packet(char **buffer); - struct wz_private dma; - Slice* next_slice; - void* operator()(void*) /*override*/; - uint64_t counts; - uint64_t ncalls; - tbb::tick_count lastStartTime; - uint64_t last_count; - uint64_t dma_errors; - uint64_t dma_oversized; - uint64_t board_resets; - ctrl* control; -}; - -typedef std::shared_ptr<WZDmaInputFilter> WZDmaInputFilterPtr; - -#endif -- GitLab