diff --git a/src/DmaInputFilter.cc b/src/DmaInputFilter.cc new file mode 100644 index 0000000000000000000000000000000000000000..5b374ff902244e8e4a95bd01dd3a6f12f0265d31 --- /dev/null +++ b/src/DmaInputFilter.cc @@ -0,0 +1,119 @@ +#include <cassert> +#include <cstdio> +#include <cerrno> +#include <system_error> +#include <iostream> +#include <sstream> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "DmaInputFilter.h" +#include "log.h" + + + +DmaInputFilter::DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ) : + InputFilter( packetBufferSize, nbPacketBuffers, control ) +{ + dma_fd = open( deviceFileName.c_str(), O_RDWR | O_NONBLOCK ); + if ( dma_fd < 0 ) { + throw std::system_error(errno, std::system_category(), "Cannot open DMA device: " + deviceFileName); + } + + LOG(TRACE) << "Created DMA input filter"; +} + +DmaInputFilter::~DmaInputFilter() { + close( dma_fd ); + LOG(TRACE) << "Destroyed DMA input filter"; +} + +/* + * man 2 write: + * On Linux, write() (and similar system calls) will transfer at most + * 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes + * actually transferred. (This is true on both 32-bit and 64-bit + * systems.) + */ + +#define RW_MAX_SIZE 0x7ffff000 + +static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size) +{ + ssize_t rc; + uint64_t to_read = size; + + if (to_read > RW_MAX_SIZE) { + to_read = RW_MAX_SIZE; + } + + /* read data from file into memory buffer */ + rc = read(fd, buffer, to_read); + + if (rc <= 0) { + return rc; + } + return rc; +} + +ssize_t DmaInputFilter::readPacketFromDMA(char **buffer, size_t bufferSize) +{ + // Read from DMA + + ssize_t bytesRead = 0; + int skip = 0; + + while (true) { + // Read from DMA + bytesRead = read_axi_packet_to_buffer(dma_fd, *buffer, bufferSize); + + if (bytesRead < 0) { + skip++; + // Check for errors and then skip + if (errno == EIO || errno == EMSGSIZE) { + if (errno == EIO) { + stats.nbDmaErrors++; + LOG(ERROR) << "#" << nbReads() << ": DMA I/O ERROR. Skipping packet #" << skip << '.'; + } else { + stats.nbDmaOversizedPackets++; + LOG(ERROR) << "#" << nbReads() << ": DMA read returned oversized packet. DMA returned " << bytesRead << ", buffer size is " << bufferSize << ". Skipping packet #" << skip << '.'; + } + continue; + } + + // Some fatal error occurred + std::ostringstream os; + os << "Iteration: " << nbReads() << " ERROR: DMA read failed."; + throw std::system_error(errno, std::system_category(), os.str() ); + } + + // We have some data + break; + } + + return bytesRead; +} + + +/************************************************************************** + * Entry points are here + * Overriding virtual functions + */ + + +// Print some additional info +void DmaInputFilter::print(std::ostream& out) const +{ + out + << ", DMA errors " << stats.nbDmaErrors + << ", oversized " << stats.nbDmaOversizedPackets; +} + + +// Read a packet from DMA +ssize_t DmaInputFilter::readInput(char **buffer, size_t bufferSize) +{ + return readPacketFromDMA( buffer, bufferSize ); +} diff --git a/src/DmaInputFilter.h b/src/DmaInputFilter.h new file mode 100644 index 0000000000000000000000000000000000000000..0f58ba52f0187010a1a08294e0d2f65640237b63 --- /dev/null +++ b/src/DmaInputFilter.h @@ -0,0 +1,35 @@ +#ifndef DMA_INPUT_H +#define DMA_INPUT_H + +#include <memory> +#include <string> + +#include "tbb/pipeline.h" +#include "tbb/tick_count.h" + +#include "InputFilter.h" + +class DmaInputFilter: public InputFilter { + public: + DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ); + ~DmaInputFilter(); + +protected: + ssize_t readInput(char **buffer, size_t bufferSize); // Override + void print(std::ostream& out) const; // Override + +private: + int dma_fd; + + ssize_t readPacketFromDMA(char **buffer, size_t bufferSize); + + struct Statistics { + uint64_t nbDmaErrors = 0; + uint64_t nbDmaOversizedPackets = 0; + } stats; +}; + + +typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr; + +#endif diff --git a/src/InputFilter.cc b/src/InputFilter.cc index ccf59383f30f9f387be0d33f75e1cea7dc331a25..cb52c77e0fb044ce9e686575c275c3a74d48d32a 100644 --- a/src/InputFilter.cc +++ b/src/InputFilter.cc @@ -19,6 +19,10 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl& minBytesRead_ = SSIZE_MAX; maxBytesRead_ = 0; previousNbReads_ = 0; + + LOG(TRACE) << "Configuration translated into:"; + LOG(TRACE) << " MAX_BYTES_PER_INPUT_SLICE: " << packetBufferSize; + LOG(TRACE) << " TOTAL_SLICES: " << nbPacketBuffers; LOG(TRACE) << "Created input filter and allocated at " << static_cast<void*>(nextSlice_); } @@ -30,7 +34,7 @@ InputFilter::~InputFilter() { } -void InputFilter::printStats(std::ostream& out) +void InputFilter::printStats(std::ostream& out, ssize_t lastBytesRead) { // Calculate DMA bandwidth tbb::tick_count now = tbb::tick_count::now(); @@ -56,7 +60,8 @@ void InputFilter::printStats(std::ostream& out) out << "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, " - << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_; + << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_ + << " last " << lastBytesRead; // Restore formatting out.copyfmt(state); @@ -151,7 +156,7 @@ void* InputFilter::operator()(void*) { // Print some statistics if (control_.packets_per_report && (nbReads_ % control_.packets_per_report == 0)) { std::ostringstream log; - printStats( log ); + printStats( log, bytesRead ); // HACK: This function is not supposed to be called from here dumpPacketTrailer( nextSlice_->begin(), bytesRead, log ); LOG(INFO) << log.str(); diff --git a/src/InputFilter.h b/src/InputFilter.h index 442293f3b72e293f3dd6a17b71dac8234744f534..b53096a587367a31e3b49db9c1c0317555b3cc8c 100644 --- a/src/InputFilter.h +++ b/src/InputFilter.h @@ -13,7 +13,7 @@ class Slice; /* * This is an abstract class. - * A dervide class has to implement methods readInput and readComplete + * A derived class has to implement methods readInput and readComplete */ class InputFilter: public tbb::filter { public: @@ -39,7 +39,7 @@ private: // NOTE: This can be moved out of this class into a separate one // and run in a single thread in order to do reporting... - void printStats(std::ostream& out); + void printStats(std::ostream& out, ssize_t lastBytesRead); private: ctrl& control_; @@ -47,10 +47,10 @@ private: Slice* nextSlice_; //uint32_t counts; - // Number of successfull reads + // Number of successful reads uint64_t nbReads_; - // Number of byted read + // Number of bytes read uint64_t nbBytesRead_; // For Performance monitoring diff --git a/src/Makefile b/src/Makefile index fb18e466f9ad5e673781c286c9234b3fd76b31c4..6e04e1d16f37d0f50d1ffa91fb0644f717506767 100644 --- a/src/Makefile +++ b/src/Makefile @@ -12,7 +12,7 @@ TARGET = scdaq # source files -SOURCES = config.cc dma_input.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc +SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc C_SOURCES = wz_dma.c # work out names of object files from sources @@ -59,7 +59,7 @@ ${TARGET}: ${OBJECTS} scdaq.o: file_input.h processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h config.o: config.h log.h -dma_input.o: dma_input.h slice.h +DmaInputFilter.o: DmaInputFilter.h slice.h elastico.o: elastico.h format.h slice.h controls.h log.h file_dma_input.o: file_dma_input.h FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h diff --git a/src/WZDmaInputFilter.cc b/src/WZDmaInputFilter.cc index de742eadf5c52c12f0ecdf1d39f3276bc483cc55..2619de3c8befc0b256385a5cdcfdee92992ba6f2 100644 --- a/src/WZDmaInputFilter.cc +++ b/src/WZDmaInputFilter.cc @@ -112,7 +112,7 @@ inline ssize_t WZDmaInputFilter::read_packet( char **buffer, size_t bufferSize ) // Oversized packet is usually sign of link problem // Let's try to reset the board - LOG(ERROR) << "Goging to reset the board:"; + LOG(ERROR) << "Going to reset the board:"; if (wz_reset_board() < 0) { LOG(ERROR) << "Reset finished"; } else { @@ -158,14 +158,11 @@ void WZDmaInputFilter::print(std::ostream& out) const // Read a packet from DMA ssize_t WZDmaInputFilter::readInput(char **buffer, size_t bufferSize) { - // We need at least 1MB buffer - assert( bufferSize >= 1024*1024 ); - return read_packet( buffer, bufferSize ); } -// Notifi the DMA that packet was processed +// Notify the DMA that packet was processed void WZDmaInputFilter::readComplete(char *buffer) { (void)(buffer); diff --git a/src/dma_input.cc b/src/dma_input.cc deleted file mode 100644 index a38a5a54bc7ac47567669f41a89bbfb6f9c2f649..0000000000000000000000000000000000000000 --- a/src/dma_input.cc +++ /dev/null @@ -1,134 +0,0 @@ -#include <cassert> -#include <cstdio> -#include <cerrno> -#include <system_error> -#include <iostream> -#include <sstream> - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include "dma_input.h" -#include "slice.h" - - -DmaInputFilter::DmaInputFilter( const std::string& dma_dev_, size_t packet_buffer_size_, - size_t number_of_packet_buffers_) : - - filter(serial_in_order), - next_slice(Slice::preAllocate( packet_buffer_size_, number_of_packet_buffers_) ), - counts(0), - ncalls(0), - lastStartTime(tbb::tick_count::now()), - last_count(0) -{ - dma_fd = open( dma_dev_.c_str(), O_RDWR | O_NONBLOCK ); - if ( dma_fd < 0 ) { - throw std::system_error(errno, std::system_category(), "Cannot open DMA device" + dma_dev_); - } - - fprintf(stderr,"Created input dma filter and allocated at 0x%llx \n",(unsigned long long)next_slice); -} - -DmaInputFilter::~DmaInputFilter() { - fprintf(stderr,"Destroy input dma filter and delete at 0x%llx \n",(unsigned long long)next_slice); - Slice::giveAllocated(next_slice); - fprintf(stderr,"input operator total %lu read \n",counts); - close( dma_fd ); -} - -/* - * man 2 write: - * On Linux, write() (and similar system calls) will transfer at most - * 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes - * actually transferred. (This is true on both 32-bit and 64-bit - * systems.) - */ - -#define RW_MAX_SIZE 0x7ffff000 - -static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size) -{ - ssize_t rc; - uint64_t to_read = size; - - if (to_read > RW_MAX_SIZE) { - to_read = RW_MAX_SIZE; - } - - /* read data from file into memory buffer */ - rc = read(fd, buffer, to_read); - if (rc <= 0) { - return rc; - } - return rc; -} - -void* DmaInputFilter::operator()(void*) { - size_t buffer_size = next_slice->avail(); - ssize_t bytes_read = 0; - - // We need at least 1MB buffer - assert( buffer_size >= 1024*1024 ); - - while (true) { - // Count reads - ncalls++; - - // Read from DMA - bytes_read = read_axi_packet_to_buffer(dma_fd, next_slice->begin(), buffer_size); - - if (bytes_read < 0) { - // Check for errors we can skip - if (errno == EIO || errno == EMSGSIZE) { - std::cerr << "Iteration: " << ncalls; - if (errno == EIO) { - std::cerr << " DMA ERROR: I/O ERROR, skipping packet...\n"; - } else { - std::cerr << " DMA ERROR: Packet too long, skipping packet...\n"; - } - continue; - } - - // Some fatal error occured - std::ostringstream os; - os << "Iteration: " << ncalls - << " ERROR: DMA read failed."; - throw std::system_error(errno, std::system_category(), os.str() ); - } - - // We have some data - break; - } - - // This should not happen - if (bytes_read > (ssize_t)buffer_size ){ - std::ostringstream os; - os << "Iteration: " << ncalls - << " ERROR: DMA read returned " << bytes_read - << " > buffer size " << buffer_size; - throw std::runtime_error( os.str() ); - } - - // This should really not happen - assert( bytes_read != 0); - - // Calculate DMA bandwidth - tbb::tick_count now = tbb::tick_count::now(); - double time_diff = (double)((now - lastStartTime).seconds()); - lastStartTime = now; - double bwd = bytes_read / ( time_diff * 1024.0 * 1024.0 ); - - std::cout << "Read returned: " << bytes_read << ", DMA bandwidth " << bwd << "MBytes/sec\n"; - - // Have more data to process. - Slice* this_slice = next_slice; - next_slice = Slice::getAllocated(); - - // Adjust the end of this buffer - this_slice->set_end( this_slice->end() + bytes_read ); - - return this_slice; - -} diff --git a/src/dma_input.h b/src/dma_input.h deleted file mode 100644 index fd9c731cb8786b447bc75ebb79655e3c40b061f1..0000000000000000000000000000000000000000 --- a/src/dma_input.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef DMA_INPUT_H -#define DMA_INPUT_H - -#include <memory> -#include <string> -#include "tbb/pipeline.h" -#include "tbb/tick_count.h" - -class Slice; - -class DmaInputFilter: public tbb::filter { - public: - DmaInputFilter( const std::string&, size_t, size_t); - ~DmaInputFilter(); - private: - int dma_fd; - Slice* next_slice; - void* operator()(void*) /*override*/; - uint64_t counts; - uint64_t ncalls; - tbb::tick_count lastStartTime; - uint64_t last_count; -}; - -typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr; - -#endif diff --git a/src/scdaq.cc b/src/scdaq.cc index 8207e6f1dd9555809214e33a386864fbbd160ae8..766d1b82ea54feae5dc2b92277e144992f9569ae 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -18,7 +18,7 @@ #include "InputFilter.h" #include "FileDmaInputFilter.h" #include "WZDmaInputFilter.h" -#include "dma_input.h" +#include "DmaInputFilter.h" #include "file_input.h" #include "processor.h" #include "elastico.h" @@ -42,11 +42,10 @@ bool silent = false; int run_pipeline( int nbThreads, ctrl& control, config& conf ) { config::InputType input = conf.getInput(); + size_t packetBufferSize = conf.getDmaPacketBufferSize(); + size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers(); - size_t MAX_BYTES_PER_INPUT_SLICE = conf.getDmaPacketBufferSize(); - size_t TOTAL_SLICES = conf.getNumberOfDmaPacketBuffers(); - - // Create empty input reader, will assing later when we know what is the data source + // Create empty input reader, will assign later when we know what is the data source std::shared_ptr<InputFilter> input_filter; // Create the pipeline @@ -54,24 +53,23 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) if (input == config::InputType::FILE) { // Create file-reading writing stage and add it to the pipeline - MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer(); - TOTAL_SLICES = conf.getNumInputBuffers(); + //MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer(); + //TOTAL_SLICES = conf.getNumInputBuffers(); //input_filter = std::make_shared<FileInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input type FILE is temporarily not supported"); + throw std::runtime_error("input type FILE is not supported"); } else if (input == config::InputType::DMA) { // Create DMA reader - //input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input type DMA is temporarily not supported"); + input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), packetBufferSize, nbPacketBuffers, control ); } else if (input == config::InputType::FILEDMA) { // Create FILE DMA reader - input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); + input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), packetBufferSize, nbPacketBuffers, control ); } else if (input == config::InputType::WZDMA ) { // Create WZ DMA reader - input_filter = std::make_shared<WZDmaInputFilter>( MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); + input_filter = std::make_shared<WZDmaInputFilter>( packetBufferSize, nbPacketBuffers, control ); } else { throw std::invalid_argument("Configuration error: Unknown input type was specified"); @@ -80,13 +78,9 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) // Add input reader to a pipeline pipeline.add_filter( *input_filter ); - LOG(INFO) << "Configuration translated into:"; - LOG(INFO) << " MAX_BYTES_PER_INPUT_SLICE: " << MAX_BYTES_PER_INPUT_SLICE; - LOG(INFO) << " TOTAL_SLICES: " << TOTAL_SLICES; - // Create reformatter and add it to the pipeline // TODO: Created here so we are not subject of scoping, fix later... - StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE, conf.getDoZS()); + StreamProcessor stream_processor(packetBufferSize, conf.getDoZS()); if ( conf.getEnableStreamProcessor() ) { pipeline.add_filter( stream_processor ); } @@ -94,7 +88,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) // Create elastic populator (if requested) std::string url = conf.getElasticUrl(); // TODO: Created here so we are not subject of scoping, fix later... - ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE, + ElasticProcessor elastic_processor(packetBufferSize, &control, url, conf.getPtCut(), @@ -144,6 +138,12 @@ int main( int argc, char* argv[] ) { control.packets_per_report = conf.getPacketsPerReport(); control.output_force_write = conf.getOutputForceWrite(); + // Firmware needs at least 1MB buffer for DMA + if (conf.getDmaPacketBufferSize() < 1024*1024) { + LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file."; + return 1; + } + boost::asio::io_service io_service; server s(io_service, conf.getPortNumber(), control); boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));