diff --git a/src/DmaInputFilter.cc b/src/DmaInputFilter.cc new file mode 100644 index 0000000000000000000000000000000000000000..5b374ff902244e8e4a95bd01dd3a6f12f0265d31 --- /dev/null +++ b/src/DmaInputFilter.cc @@ -0,0 +1,119 @@ +#include <cassert> +#include <cstdio> +#include <cerrno> +#include <system_error> +#include <iostream> +#include <sstream> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "DmaInputFilter.h" +#include "log.h" + + + +DmaInputFilter::DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ) : + InputFilter( packetBufferSize, nbPacketBuffers, control ) +{ + dma_fd = open( deviceFileName.c_str(), O_RDWR | O_NONBLOCK ); + if ( dma_fd < 0 ) { + throw std::system_error(errno, std::system_category(), "Cannot open DMA device: " + deviceFileName); + } + + LOG(TRACE) << "Created DMA input filter"; +} + +DmaInputFilter::~DmaInputFilter() { + close( dma_fd ); + LOG(TRACE) << "Destroyed DMA input filter"; +} + +/* + * man 2 write: + * On Linux, write() (and similar system calls) will transfer at most + * 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes + * actually transferred. (This is true on both 32-bit and 64-bit + * systems.) + */ + +#define RW_MAX_SIZE 0x7ffff000 + +static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size) +{ + ssize_t rc; + uint64_t to_read = size; + + if (to_read > RW_MAX_SIZE) { + to_read = RW_MAX_SIZE; + } + + /* read data from file into memory buffer */ + rc = read(fd, buffer, to_read); + + if (rc <= 0) { + return rc; + } + return rc; +} + +ssize_t DmaInputFilter::readPacketFromDMA(char **buffer, size_t bufferSize) +{ + // Read from DMA + + ssize_t bytesRead = 0; + int skip = 0; + + while (true) { + // Read from DMA + bytesRead = read_axi_packet_to_buffer(dma_fd, *buffer, bufferSize); + + if (bytesRead < 0) { + skip++; + // Check for errors and then skip + if (errno == EIO || errno == EMSGSIZE) { + if (errno == EIO) { + stats.nbDmaErrors++; + LOG(ERROR) << "#" << nbReads() << ": DMA I/O ERROR. Skipping packet #" << skip << '.'; + } else { + stats.nbDmaOversizedPackets++; + LOG(ERROR) << "#" << nbReads() << ": DMA read returned oversized packet. DMA returned " << bytesRead << ", buffer size is " << bufferSize << ". Skipping packet #" << skip << '.'; + } + continue; + } + + // Some fatal error occurred + std::ostringstream os; + os << "Iteration: " << nbReads() << " ERROR: DMA read failed."; + throw std::system_error(errno, std::system_category(), os.str() ); + } + + // We have some data + break; + } + + return bytesRead; +} + + +/************************************************************************** + * Entry points are here + * Overriding virtual functions + */ + + +// Print some additional info +void DmaInputFilter::print(std::ostream& out) const +{ + out + << ", DMA errors " << stats.nbDmaErrors + << ", oversized " << stats.nbDmaOversizedPackets; +} + + +// Read a packet from DMA +ssize_t DmaInputFilter::readInput(char **buffer, size_t bufferSize) +{ + return readPacketFromDMA( buffer, bufferSize ); +} diff --git a/src/DmaInputFilter.h b/src/DmaInputFilter.h new file mode 100644 index 0000000000000000000000000000000000000000..0f58ba52f0187010a1a08294e0d2f65640237b63 --- /dev/null +++ b/src/DmaInputFilter.h @@ -0,0 +1,35 @@ +#ifndef DMA_INPUT_H +#define DMA_INPUT_H + +#include <memory> +#include <string> + +#include "tbb/pipeline.h" +#include "tbb/tick_count.h" + +#include "InputFilter.h" + +class DmaInputFilter: public InputFilter { + public: + DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ); + ~DmaInputFilter(); + +protected: + ssize_t readInput(char **buffer, size_t bufferSize); // Override + void print(std::ostream& out) const; // Override + +private: + int dma_fd; + + ssize_t readPacketFromDMA(char **buffer, size_t bufferSize); + + struct Statistics { + uint64_t nbDmaErrors = 0; + uint64_t nbDmaOversizedPackets = 0; + } stats; +}; + + +typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr; + +#endif diff --git a/src/InputFilter.cc b/src/InputFilter.cc index ccf59383f30f9f387be0d33f75e1cea7dc331a25..cb52c77e0fb044ce9e686575c275c3a74d48d32a 100644 --- a/src/InputFilter.cc +++ b/src/InputFilter.cc @@ -19,6 +19,10 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl& minBytesRead_ = SSIZE_MAX; maxBytesRead_ = 0; previousNbReads_ = 0; + + LOG(TRACE) << "Configuration translated into:"; + LOG(TRACE) << " MAX_BYTES_PER_INPUT_SLICE: " << packetBufferSize; + LOG(TRACE) << " TOTAL_SLICES: " << nbPacketBuffers; LOG(TRACE) << "Created input filter and allocated at " << static_cast<void*>(nextSlice_); } @@ -30,7 +34,7 @@ InputFilter::~InputFilter() { } -void InputFilter::printStats(std::ostream& out) +void InputFilter::printStats(std::ostream& out, ssize_t lastBytesRead) { // Calculate DMA bandwidth tbb::tick_count now = tbb::tick_count::now(); @@ -56,7 +60,8 @@ void InputFilter::printStats(std::ostream& out) out << "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, " - << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_; + << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_ + << " last " << lastBytesRead; // Restore formatting out.copyfmt(state); @@ -151,7 +156,7 @@ void* InputFilter::operator()(void*) { // Print some statistics if (control_.packets_per_report && (nbReads_ % control_.packets_per_report == 0)) { std::ostringstream log; - printStats( log ); + printStats( log, bytesRead ); // HACK: This function is not supposed to be called from here dumpPacketTrailer( nextSlice_->begin(), bytesRead, log ); LOG(INFO) << log.str(); diff --git a/src/InputFilter.h b/src/InputFilter.h index 442293f3b72e293f3dd6a17b71dac8234744f534..b53096a587367a31e3b49db9c1c0317555b3cc8c 100644 --- a/src/InputFilter.h +++ b/src/InputFilter.h @@ -13,7 +13,7 @@ class Slice; /* * This is an abstract class. - * A dervide class has to implement methods readInput and readComplete + * A derived class has to implement methods readInput and readComplete */ class InputFilter: public tbb::filter { public: @@ -39,7 +39,7 @@ private: // NOTE: This can be moved out of this class into a separate one // and run in a single thread in order to do reporting... - void printStats(std::ostream& out); + void printStats(std::ostream& out, ssize_t lastBytesRead); private: ctrl& control_; @@ -47,10 +47,10 @@ private: Slice* nextSlice_; //uint32_t counts; - // Number of successfull reads + // Number of successful reads uint64_t nbReads_; - // Number of byted read + // Number of bytes read uint64_t nbBytesRead_; // For Performance monitoring diff --git a/src/Makefile b/src/Makefile index fb18e466f9ad5e673781c286c9234b3fd76b31c4..ef734ed5a7549e361093027dfc9251640a57a6b3 100644 --- a/src/Makefile +++ b/src/Makefile @@ -12,7 +12,7 @@ TARGET = scdaq # source files -SOURCES = config.cc dma_input.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc +SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc C_SOURCES = wz_dma.c # work out names of object files from sources @@ -27,7 +27,7 @@ CXXFLAGS = -std=c++11 -Wall -Wextra -O0 -g -rdynamic #CXXFLAGS = -std=c++11 -Wall -Wextra -g -rdynamic CFLAGS = $(CXXFLAGS) -LDFLAGS = -ltbb -lboost_thread -lcurl +LDFLAGS = -ltbb -ltbbmalloc -lboost_thread -lcurl CPPFLAGS = -I. -Iwzdma @@ -57,13 +57,11 @@ ${TARGET}: ${OBJECTS} #test2.o : product.h test2.h -scdaq.o: file_input.h processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h +scdaq.o: processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h config.o: config.h log.h -dma_input.o: dma_input.h slice.h +DmaInputFilter.o: DmaInputFilter.h slice.h elastico.o: elastico.h format.h slice.h controls.h log.h -file_dma_input.o: file_dma_input.h FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h -file_input.o: file_input.h slice.h utility.h InputFilter.o: InputFilter.h log.h output.o: output.h slice.h log.h processor.o: processor.h slice.h format.h log.h diff --git a/src/WZDmaInputFilter.cc b/src/WZDmaInputFilter.cc index de742eadf5c52c12f0ecdf1d39f3276bc483cc55..2619de3c8befc0b256385a5cdcfdee92992ba6f2 100644 --- a/src/WZDmaInputFilter.cc +++ b/src/WZDmaInputFilter.cc @@ -112,7 +112,7 @@ inline ssize_t WZDmaInputFilter::read_packet( char **buffer, size_t bufferSize ) // Oversized packet is usually sign of link problem // Let's try to reset the board - LOG(ERROR) << "Goging to reset the board:"; + LOG(ERROR) << "Going to reset the board:"; if (wz_reset_board() < 0) { LOG(ERROR) << "Reset finished"; } else { @@ -158,14 +158,11 @@ void WZDmaInputFilter::print(std::ostream& out) const // Read a packet from DMA ssize_t WZDmaInputFilter::readInput(char **buffer, size_t bufferSize) { - // We need at least 1MB buffer - assert( bufferSize >= 1024*1024 ); - return read_packet( buffer, bufferSize ); } -// Notifi the DMA that packet was processed +// Notify the DMA that packet was processed void WZDmaInputFilter::readComplete(char *buffer) { (void)(buffer); diff --git a/src/dma_input.cc b/src/dma_input.cc deleted file mode 100644 index a38a5a54bc7ac47567669f41a89bbfb6f9c2f649..0000000000000000000000000000000000000000 --- a/src/dma_input.cc +++ /dev/null @@ -1,134 +0,0 @@ -#include <cassert> -#include <cstdio> -#include <cerrno> -#include <system_error> -#include <iostream> -#include <sstream> - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include "dma_input.h" -#include "slice.h" - - -DmaInputFilter::DmaInputFilter( const std::string& dma_dev_, size_t packet_buffer_size_, - size_t number_of_packet_buffers_) : - - filter(serial_in_order), - next_slice(Slice::preAllocate( packet_buffer_size_, number_of_packet_buffers_) ), - counts(0), - ncalls(0), - lastStartTime(tbb::tick_count::now()), - last_count(0) -{ - dma_fd = open( dma_dev_.c_str(), O_RDWR | O_NONBLOCK ); - if ( dma_fd < 0 ) { - throw std::system_error(errno, std::system_category(), "Cannot open DMA device" + dma_dev_); - } - - fprintf(stderr,"Created input dma filter and allocated at 0x%llx \n",(unsigned long long)next_slice); -} - -DmaInputFilter::~DmaInputFilter() { - fprintf(stderr,"Destroy input dma filter and delete at 0x%llx \n",(unsigned long long)next_slice); - Slice::giveAllocated(next_slice); - fprintf(stderr,"input operator total %lu read \n",counts); - close( dma_fd ); -} - -/* - * man 2 write: - * On Linux, write() (and similar system calls) will transfer at most - * 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes - * actually transferred. (This is true on both 32-bit and 64-bit - * systems.) - */ - -#define RW_MAX_SIZE 0x7ffff000 - -static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size) -{ - ssize_t rc; - uint64_t to_read = size; - - if (to_read > RW_MAX_SIZE) { - to_read = RW_MAX_SIZE; - } - - /* read data from file into memory buffer */ - rc = read(fd, buffer, to_read); - if (rc <= 0) { - return rc; - } - return rc; -} - -void* DmaInputFilter::operator()(void*) { - size_t buffer_size = next_slice->avail(); - ssize_t bytes_read = 0; - - // We need at least 1MB buffer - assert( buffer_size >= 1024*1024 ); - - while (true) { - // Count reads - ncalls++; - - // Read from DMA - bytes_read = read_axi_packet_to_buffer(dma_fd, next_slice->begin(), buffer_size); - - if (bytes_read < 0) { - // Check for errors we can skip - if (errno == EIO || errno == EMSGSIZE) { - std::cerr << "Iteration: " << ncalls; - if (errno == EIO) { - std::cerr << " DMA ERROR: I/O ERROR, skipping packet...\n"; - } else { - std::cerr << " DMA ERROR: Packet too long, skipping packet...\n"; - } - continue; - } - - // Some fatal error occured - std::ostringstream os; - os << "Iteration: " << ncalls - << " ERROR: DMA read failed."; - throw std::system_error(errno, std::system_category(), os.str() ); - } - - // We have some data - break; - } - - // This should not happen - if (bytes_read > (ssize_t)buffer_size ){ - std::ostringstream os; - os << "Iteration: " << ncalls - << " ERROR: DMA read returned " << bytes_read - << " > buffer size " << buffer_size; - throw std::runtime_error( os.str() ); - } - - // This should really not happen - assert( bytes_read != 0); - - // Calculate DMA bandwidth - tbb::tick_count now = tbb::tick_count::now(); - double time_diff = (double)((now - lastStartTime).seconds()); - lastStartTime = now; - double bwd = bytes_read / ( time_diff * 1024.0 * 1024.0 ); - - std::cout << "Read returned: " << bytes_read << ", DMA bandwidth " << bwd << "MBytes/sec\n"; - - // Have more data to process. - Slice* this_slice = next_slice; - next_slice = Slice::getAllocated(); - - // Adjust the end of this buffer - this_slice->set_end( this_slice->end() + bytes_read ); - - return this_slice; - -} diff --git a/src/dma_input.h b/src/dma_input.h deleted file mode 100644 index fd9c731cb8786b447bc75ebb79655e3c40b061f1..0000000000000000000000000000000000000000 --- a/src/dma_input.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef DMA_INPUT_H -#define DMA_INPUT_H - -#include <memory> -#include <string> -#include "tbb/pipeline.h" -#include "tbb/tick_count.h" - -class Slice; - -class DmaInputFilter: public tbb::filter { - public: - DmaInputFilter( const std::string&, size_t, size_t); - ~DmaInputFilter(); - private: - int dma_fd; - Slice* next_slice; - void* operator()(void*) /*override*/; - uint64_t counts; - uint64_t ncalls; - tbb::tick_count lastStartTime; - uint64_t last_count; -}; - -typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr; - -#endif diff --git a/src/file_input.cc b/src/file_input.cc deleted file mode 100644 index 52d5e7b611ef0633eef83bc83a334d06c3211343..0000000000000000000000000000000000000000 --- a/src/file_input.cc +++ /dev/null @@ -1,63 +0,0 @@ -#include <cstdio> -#include "file_input.h" -#include "slice.h" -#include "utility.h" - -FileInputFilter::FileInputFilter( const std::string& file_name_, size_t max_size_, - size_t nslices_) : - - filter(serial_in_order), - next_slice(Slice::preAllocate( max_size_,nslices_) ), - counts(0), - ncalls(0), - lastStartTime(tbb::tick_count::now()), - last_count(0) -{ - input_file = fopen( file_name_.c_str(), "r" ); - if ( !input_file ) { - throw std::invalid_argument( "Invalid input file name: " + file_name_ ); - } - fprintf(stderr,"Created input filter and allocated at 0x%llx \n",(unsigned long long)next_slice); -} - -FileInputFilter::~FileInputFilter() { - fprintf(stderr,"Destroy input filter and delete at 0x%llx \n",(unsigned long long)next_slice); - Slice::giveAllocated(next_slice); - fprintf(stderr,"input operator total %lu read \n",counts); - // fclose( output_file ); - fclose( input_file ); -} - -void* FileInputFilter::operator()(void*) { - - ncalls++; - size_t m = next_slice->avail(); - size_t n = fread( next_slice->begin(), 1, m, input_file ); - - if(n<m){ - // fprintf(stderr,"input operator read less %d \n",n); - - } - counts+=n/192; - - if(ncalls%100000==0){ - printf("input bandwidth %f\n",double((counts-last_count)*192/1024./1024.)/ - double((tbb::tick_count::now() - lastStartTime).seconds())); - lastStartTime=tbb::tick_count::now(); - last_count=counts; - } - if( n==0 ) { - fseek(input_file,0,SEEK_SET); - return this->operator()(nullptr); - } else { - // Have more data to process. - Slice& t = *next_slice; - next_slice = Slice::getAllocated(); - - char* p = t.end()+n; - t.set_end(p); - - return &t; - } - -} diff --git a/src/file_input.h b/src/file_input.h deleted file mode 100644 index 613b82deae0f6255b126183447dbfb77ec6a50e4..0000000000000000000000000000000000000000 --- a/src/file_input.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef FILE_INPUT_H -#define FILE_INPUT_H - -#include <memory> -#include <string> -#include "tbb/pipeline.h" -#include "tbb/tick_count.h" - -class Slice; - -class FileInputFilter: public tbb::filter { - public: - FileInputFilter( const std::string&, size_t, size_t); - ~FileInputFilter(); - private: - FILE* input_file; - Slice* next_slice; - void* operator()(void*) /*override*/; - uint64_t counts; - uint64_t ncalls; - tbb::tick_count lastStartTime; - uint64_t last_count; -}; - -typedef std::shared_ptr<FileInputFilter> FileInputFilterPtr; - -#endif diff --git a/src/processor.cc b/src/processor.cc index cc1d0ee9b5c23cd1aa57322231afcab9db115c90..e2726f2d4b446ec93574f69eda94d8a696332a9e 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -62,7 +62,7 @@ Slice* StreamProcessor::process(Slice& input, Slice& out) } bool brill_enabled = 0; - if( ( brill_enabled) && ((bl->orbit[i] == 0xFF) ||( bl->bx[i] == 0xFF) ||( bl->mu1f[i] == 0xFF) || + if(( brill_enabled) && ((bl->orbit[i] == 0xFF) ||( bl->bx[i] == 0xFF) ||( bl->mu1f[i] == 0xFF) || (bl->mu1s[i] == 0xFF) ||( bl->mu2f[i] == 0xFF) ||( bl->mu2s[i] == 0xFF))){ brill_word = true; } diff --git a/src/scdaq.cc b/src/scdaq.cc index 3079984d6bdb3ac1d9c1729a15e3c6898b50c1ae..5d1289c8dfd5fadff16ebec95c8a64772bdc2975 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -18,8 +18,7 @@ #include "InputFilter.h" #include "FileDmaInputFilter.h" #include "WZDmaInputFilter.h" -#include "dma_input.h" -#include "file_input.h" +#include "DmaInputFilter.h" #include "processor.h" #include "elastico.h" #include "output.h" @@ -32,46 +31,31 @@ using namespace std; - - - - - bool silent = false; int run_pipeline( int nbThreads, ctrl& control, config& conf ) { config::InputType input = conf.getInput(); + size_t packetBufferSize = conf.getDmaPacketBufferSize(); + size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers(); - size_t MAX_BYTES_PER_INPUT_SLICE = conf.getDmaPacketBufferSize(); - size_t TOTAL_SLICES = conf.getNumberOfDmaPacketBuffers(); - - // Create empty input reader, will assing later when we know what is the data source + // Create empty input reader, will assign later when we know what is the data source std::shared_ptr<InputFilter> input_filter; // Create the pipeline tbb::pipeline pipeline; - if (input == config::InputType::FILE) { - // Create file-reading writing stage and add it to the pipeline - MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer(); - TOTAL_SLICES = conf.getNumInputBuffers(); - - //input_filter = std::make_shared<FileInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input type FILE is temporarily not supported"); - - } else if (input == config::InputType::DMA) { + if (input == config::InputType::DMA) { // Create DMA reader - //input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); - throw std::runtime_error("input type DMA is temporarily not supported"); + input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), packetBufferSize, nbPacketBuffers, control ); } else if (input == config::InputType::FILEDMA) { // Create FILE DMA reader - input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); + input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), packetBufferSize, nbPacketBuffers, control ); } else if (input == config::InputType::WZDMA ) { // Create WZ DMA reader - input_filter = std::make_shared<WZDmaInputFilter>( MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); + input_filter = std::make_shared<WZDmaInputFilter>( packetBufferSize, nbPacketBuffers, control ); } else { throw std::invalid_argument("Configuration error: Unknown input type was specified"); @@ -80,13 +64,9 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) // Add input reader to a pipeline pipeline.add_filter( *input_filter ); - LOG(INFO) << "Configuration translated into:"; - LOG(INFO) << " MAX_BYTES_PER_INPUT_SLICE: " << MAX_BYTES_PER_INPUT_SLICE; - LOG(INFO) << " TOTAL_SLICES: " << TOTAL_SLICES; - // Create reformatter and add it to the pipeline // TODO: Created here so we are not subject of scoping, fix later... - StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE, conf.getDoZS(), conf.getSystemName()); + StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getSystemName()); if ( conf.getEnableStreamProcessor() ) { pipeline.add_filter( stream_processor ); } @@ -94,7 +74,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) // Create elastic populator (if requested) std::string url = conf.getElasticUrl(); // TODO: Created here so we are not subject of scoping, fix later... - ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE, + ElasticProcessor elastic_processor(packetBufferSize, &control, url, conf.getPtCut(), @@ -124,7 +104,6 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) } - int main( int argc, char* argv[] ) { (void)(argc); (void)(argv); @@ -145,6 +124,12 @@ int main( int argc, char* argv[] ) { control.packets_per_report = conf.getPacketsPerReport(); control.output_force_write = conf.getOutputForceWrite(); + // Firmware needs at least 1MB buffer for DMA + if (conf.getDmaPacketBufferSize() < 1024*1024) { + LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file."; + return 1; + } + boost::asio::io_service io_service; server s(io_service, conf.getPortNumber(), control); boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); diff --git a/src/scdaq.conf b/src/scdaq.conf index a333f36f65e79669f4cdf4d69b265904a3c82327..a6a3e95d94a5c99cff8c54f7db35fe0a775f8314 100644 --- a/src/scdaq.conf +++ b/src/scdaq.conf @@ -2,7 +2,6 @@ # "wzdma" for DMA driver from Wojciech M. Zabolotny # "dma" for XILINX DMA driver # "filedma" for reading from file and simulating DMA -# "file" for reading from file input:wzdma #input:filedma @@ -25,7 +24,7 @@ packets_per_report:200000 system_name:DMX #system_name:GMT -## Settings for file input +## Settings for file(dma) input #input_file:/dev/shm/testdata.bin input_file:testdata.bin #input_file:../dumps/dump-empty-run.bin diff --git a/src/slice.h b/src/slice.h index 18b5f640196bf3a290aa18a98267b6b5c24100c1..1c762ed1dbdb3dfedd87f866d305a0b03654413f 100644 --- a/src/slice.h +++ b/src/slice.h @@ -2,7 +2,8 @@ #define SLICE_H #include <stdint.h> -#include "tbb/tbb_allocator.h" +//#include "tbb/tbb_allocator.h" +#include "tbb/scalable_allocator.h" #include "tbb/concurrent_queue.h" //! Holds a slice of data. @@ -18,7 +19,12 @@ class Slice { public: //! Allocate a Slice object that can hold up to max_size bytes static Slice* allocate( size_t max_size ) { - Slice* t = (Slice*)tbb::zero_allocator<char>().allocate( sizeof(Slice)+max_size ); + //Slice* t = (Slice*)tbb::zero_allocator<char>().allocate( sizeof(Slice)+max_size ); + + // Replacing tbb::zero_allocator with aligned allocator. + // Alignment to 32 bytes (256 bits) is required by MicronDMA. + Slice* t = (Slice*) scalable_aligned_malloc( sizeof(Slice)+max_size, 32); + t->logical_end = t->begin(); t->physical_end = t->begin()+max_size; t->counts = 0; @@ -34,7 +40,9 @@ public: //! Free a Slice object void free() { // fprintf(stderr,"slice free at 0x%llx \n",(unsigned long long) this); - tbb::tbb_allocator<char>().deallocate((char*)this,sizeof(Slice)+(physical_end-begin())+1); + //tbb::tbb_allocator<char>().deallocate((char*)this,sizeof(Slice)+(physical_end-begin())+1); + + scalable_aligned_free( this ); } //! Pointer to beginning of sequence char* begin() {return (char*)(this+sizeof(Slice));}