Skip to content
Snippets Groups Projects
Unverified Commit c7914549 authored by Petr Zejdl's avatar Petr Zejdl Committed by GitHub
Browse files

Merge pull request #25 from pzejdl/dma_input_refactoring

(Xilinx) DMA Input FIlter is resurrected because it is needed for VCU128 and refactored to match the object oriented style of other filters.
parents 78c2f659 3d0fef90
No related branches found
No related tags found
1 merge request!59CMSSW json file
#include <cassert>
#include <cstdio>
#include <cerrno>
#include <system_error>
#include <iostream>
#include <sstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "DmaInputFilter.h"
#include "log.h"
DmaInputFilter::DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control ) :
InputFilter( packetBufferSize, nbPacketBuffers, control )
{
dma_fd = open( deviceFileName.c_str(), O_RDWR | O_NONBLOCK );
if ( dma_fd < 0 ) {
throw std::system_error(errno, std::system_category(), "Cannot open DMA device: " + deviceFileName);
}
LOG(TRACE) << "Created DMA input filter";
}
DmaInputFilter::~DmaInputFilter() {
close( dma_fd );
LOG(TRACE) << "Destroyed DMA input filter";
}
/*
* man 2 write:
* On Linux, write() (and similar system calls) will transfer at most
* 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes
* actually transferred. (This is true on both 32-bit and 64-bit
* systems.)
*/
#define RW_MAX_SIZE 0x7ffff000
static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size)
{
ssize_t rc;
uint64_t to_read = size;
if (to_read > RW_MAX_SIZE) {
to_read = RW_MAX_SIZE;
}
/* read data from file into memory buffer */
rc = read(fd, buffer, to_read);
if (rc <= 0) {
return rc;
}
return rc;
}
ssize_t DmaInputFilter::readPacketFromDMA(char **buffer, size_t bufferSize)
{
// Read from DMA
ssize_t bytesRead = 0;
int skip = 0;
while (true) {
// Read from DMA
bytesRead = read_axi_packet_to_buffer(dma_fd, *buffer, bufferSize);
if (bytesRead < 0) {
skip++;
// Check for errors and then skip
if (errno == EIO || errno == EMSGSIZE) {
if (errno == EIO) {
stats.nbDmaErrors++;
LOG(ERROR) << "#" << nbReads() << ": DMA I/O ERROR. Skipping packet #" << skip << '.';
} else {
stats.nbDmaOversizedPackets++;
LOG(ERROR) << "#" << nbReads() << ": DMA read returned oversized packet. DMA returned " << bytesRead << ", buffer size is " << bufferSize << ". Skipping packet #" << skip << '.';
}
continue;
}
// Some fatal error occurred
std::ostringstream os;
os << "Iteration: " << nbReads() << " ERROR: DMA read failed.";
throw std::system_error(errno, std::system_category(), os.str() );
}
// We have some data
break;
}
return bytesRead;
}
/**************************************************************************
* Entry points are here
* Overriding virtual functions
*/
// Print some additional info
void DmaInputFilter::print(std::ostream& out) const
{
out
<< ", DMA errors " << stats.nbDmaErrors
<< ", oversized " << stats.nbDmaOversizedPackets;
}
// Read a packet from DMA
ssize_t DmaInputFilter::readInput(char **buffer, size_t bufferSize)
{
return readPacketFromDMA( buffer, bufferSize );
}
...@@ -3,25 +3,33 @@ ...@@ -3,25 +3,33 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "tbb/pipeline.h" #include "tbb/pipeline.h"
#include "tbb/tick_count.h" #include "tbb/tick_count.h"
class Slice; #include "InputFilter.h"
class DmaInputFilter: public tbb::filter { class DmaInputFilter: public InputFilter {
public: public:
DmaInputFilter( const std::string&, size_t, size_t); DmaInputFilter( const std::string& deviceFileName, size_t packetBufferSize, size_t nbPacketBuffers, ctrl& control );
~DmaInputFilter(); ~DmaInputFilter();
private:
protected:
ssize_t readInput(char **buffer, size_t bufferSize); // Override
void print(std::ostream& out) const; // Override
private:
int dma_fd; int dma_fd;
Slice* next_slice;
void* operator()(void*) /*override*/; ssize_t readPacketFromDMA(char **buffer, size_t bufferSize);
uint64_t counts;
uint64_t ncalls; struct Statistics {
tbb::tick_count lastStartTime; uint64_t nbDmaErrors = 0;
uint64_t last_count; uint64_t nbDmaOversizedPackets = 0;
} stats;
}; };
typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr; typedef std::shared_ptr<DmaInputFilter> DmaInputFilterPtr;
#endif #endif
...@@ -19,6 +19,10 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl& ...@@ -19,6 +19,10 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl&
minBytesRead_ = SSIZE_MAX; minBytesRead_ = SSIZE_MAX;
maxBytesRead_ = 0; maxBytesRead_ = 0;
previousNbReads_ = 0; previousNbReads_ = 0;
LOG(TRACE) << "Configuration translated into:";
LOG(TRACE) << " MAX_BYTES_PER_INPUT_SLICE: " << packetBufferSize;
LOG(TRACE) << " TOTAL_SLICES: " << nbPacketBuffers;
LOG(TRACE) << "Created input filter and allocated at " << static_cast<void*>(nextSlice_); LOG(TRACE) << "Created input filter and allocated at " << static_cast<void*>(nextSlice_);
} }
...@@ -30,7 +34,7 @@ InputFilter::~InputFilter() { ...@@ -30,7 +34,7 @@ InputFilter::~InputFilter() {
} }
void InputFilter::printStats(std::ostream& out) void InputFilter::printStats(std::ostream& out, ssize_t lastBytesRead)
{ {
// Calculate DMA bandwidth // Calculate DMA bandwidth
tbb::tick_count now = tbb::tick_count::now(); tbb::tick_count now = tbb::tick_count::now();
...@@ -56,7 +60,8 @@ void InputFilter::printStats(std::ostream& out) ...@@ -56,7 +60,8 @@ void InputFilter::printStats(std::ostream& out)
out out
<< "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, " << "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, "
<< nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_; << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/' << maxBytesRead_
<< " last " << lastBytesRead;
// Restore formatting // Restore formatting
out.copyfmt(state); out.copyfmt(state);
...@@ -151,7 +156,7 @@ void* InputFilter::operator()(void*) { ...@@ -151,7 +156,7 @@ void* InputFilter::operator()(void*) {
// Print some statistics // Print some statistics
if (control_.packets_per_report && (nbReads_ % control_.packets_per_report == 0)) { if (control_.packets_per_report && (nbReads_ % control_.packets_per_report == 0)) {
std::ostringstream log; std::ostringstream log;
printStats( log ); printStats( log, bytesRead );
// HACK: This function is not supposed to be called from here // HACK: This function is not supposed to be called from here
dumpPacketTrailer( nextSlice_->begin(), bytesRead, log ); dumpPacketTrailer( nextSlice_->begin(), bytesRead, log );
LOG(INFO) << log.str(); LOG(INFO) << log.str();
......
...@@ -13,7 +13,7 @@ class Slice; ...@@ -13,7 +13,7 @@ class Slice;
/* /*
* This is an abstract class. * This is an abstract class.
* A dervide class has to implement methods readInput and readComplete * A derived class has to implement methods readInput and readComplete
*/ */
class InputFilter: public tbb::filter { class InputFilter: public tbb::filter {
public: public:
...@@ -39,7 +39,7 @@ private: ...@@ -39,7 +39,7 @@ private:
// NOTE: This can be moved out of this class into a separate one // NOTE: This can be moved out of this class into a separate one
// and run in a single thread in order to do reporting... // and run in a single thread in order to do reporting...
void printStats(std::ostream& out); void printStats(std::ostream& out, ssize_t lastBytesRead);
private: private:
ctrl& control_; ctrl& control_;
...@@ -47,10 +47,10 @@ private: ...@@ -47,10 +47,10 @@ private:
Slice* nextSlice_; Slice* nextSlice_;
//uint32_t counts; //uint32_t counts;
// Number of successfull reads // Number of successful reads
uint64_t nbReads_; uint64_t nbReads_;
// Number of byted read // Number of bytes read
uint64_t nbBytesRead_; uint64_t nbBytesRead_;
// For Performance monitoring // For Performance monitoring
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
TARGET = scdaq TARGET = scdaq
# source files # source files
SOURCES = config.cc dma_input.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc file_input.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc
C_SOURCES = wz_dma.c C_SOURCES = wz_dma.c
# work out names of object files from sources # work out names of object files from sources
...@@ -59,7 +59,7 @@ ${TARGET}: ${OBJECTS} ...@@ -59,7 +59,7 @@ ${TARGET}: ${OBJECTS}
scdaq.o: file_input.h processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h scdaq.o: file_input.h processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h
config.o: config.h log.h config.o: config.h log.h
dma_input.o: dma_input.h slice.h DmaInputFilter.o: DmaInputFilter.h slice.h
elastico.o: elastico.h format.h slice.h controls.h log.h elastico.o: elastico.h format.h slice.h controls.h log.h
file_dma_input.o: file_dma_input.h file_dma_input.o: file_dma_input.h
FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h
......
...@@ -112,7 +112,7 @@ inline ssize_t WZDmaInputFilter::read_packet( char **buffer, size_t bufferSize ) ...@@ -112,7 +112,7 @@ inline ssize_t WZDmaInputFilter::read_packet( char **buffer, size_t bufferSize )
// Oversized packet is usually sign of link problem // Oversized packet is usually sign of link problem
// Let's try to reset the board // Let's try to reset the board
LOG(ERROR) << "Goging to reset the board:"; LOG(ERROR) << "Going to reset the board:";
if (wz_reset_board() < 0) { if (wz_reset_board() < 0) {
LOG(ERROR) << "Reset finished"; LOG(ERROR) << "Reset finished";
} else { } else {
...@@ -158,14 +158,11 @@ void WZDmaInputFilter::print(std::ostream& out) const ...@@ -158,14 +158,11 @@ void WZDmaInputFilter::print(std::ostream& out) const
// Read a packet from DMA // Read a packet from DMA
ssize_t WZDmaInputFilter::readInput(char **buffer, size_t bufferSize) ssize_t WZDmaInputFilter::readInput(char **buffer, size_t bufferSize)
{ {
// We need at least 1MB buffer
assert( bufferSize >= 1024*1024 );
return read_packet( buffer, bufferSize ); return read_packet( buffer, bufferSize );
} }
// Notifi the DMA that packet was processed // Notify the DMA that packet was processed
void WZDmaInputFilter::readComplete(char *buffer) { void WZDmaInputFilter::readComplete(char *buffer) {
(void)(buffer); (void)(buffer);
......
#include <cassert>
#include <cstdio>
#include <cerrno>
#include <system_error>
#include <iostream>
#include <sstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "dma_input.h"
#include "slice.h"
DmaInputFilter::DmaInputFilter( const std::string& dma_dev_, size_t packet_buffer_size_,
size_t number_of_packet_buffers_) :
filter(serial_in_order),
next_slice(Slice::preAllocate( packet_buffer_size_, number_of_packet_buffers_) ),
counts(0),
ncalls(0),
lastStartTime(tbb::tick_count::now()),
last_count(0)
{
dma_fd = open( dma_dev_.c_str(), O_RDWR | O_NONBLOCK );
if ( dma_fd < 0 ) {
throw std::system_error(errno, std::system_category(), "Cannot open DMA device" + dma_dev_);
}
fprintf(stderr,"Created input dma filter and allocated at 0x%llx \n",(unsigned long long)next_slice);
}
DmaInputFilter::~DmaInputFilter() {
fprintf(stderr,"Destroy input dma filter and delete at 0x%llx \n",(unsigned long long)next_slice);
Slice::giveAllocated(next_slice);
fprintf(stderr,"input operator total %lu read \n",counts);
close( dma_fd );
}
/*
* man 2 write:
* On Linux, write() (and similar system calls) will transfer at most
* 0x7ffff000 (2,147,479,552) bytes, returning the number of bytes
* actually transferred. (This is true on both 32-bit and 64-bit
* systems.)
*/
#define RW_MAX_SIZE 0x7ffff000
static inline ssize_t read_axi_packet_to_buffer(int fd, char *buffer, uint64_t size)
{
ssize_t rc;
uint64_t to_read = size;
if (to_read > RW_MAX_SIZE) {
to_read = RW_MAX_SIZE;
}
/* read data from file into memory buffer */
rc = read(fd, buffer, to_read);
if (rc <= 0) {
return rc;
}
return rc;
}
void* DmaInputFilter::operator()(void*) {
size_t buffer_size = next_slice->avail();
ssize_t bytes_read = 0;
// We need at least 1MB buffer
assert( buffer_size >= 1024*1024 );
while (true) {
// Count reads
ncalls++;
// Read from DMA
bytes_read = read_axi_packet_to_buffer(dma_fd, next_slice->begin(), buffer_size);
if (bytes_read < 0) {
// Check for errors we can skip
if (errno == EIO || errno == EMSGSIZE) {
std::cerr << "Iteration: " << ncalls;
if (errno == EIO) {
std::cerr << " DMA ERROR: I/O ERROR, skipping packet...\n";
} else {
std::cerr << " DMA ERROR: Packet too long, skipping packet...\n";
}
continue;
}
// Some fatal error occured
std::ostringstream os;
os << "Iteration: " << ncalls
<< " ERROR: DMA read failed.";
throw std::system_error(errno, std::system_category(), os.str() );
}
// We have some data
break;
}
// This should not happen
if (bytes_read > (ssize_t)buffer_size ){
std::ostringstream os;
os << "Iteration: " << ncalls
<< " ERROR: DMA read returned " << bytes_read
<< " > buffer size " << buffer_size;
throw std::runtime_error( os.str() );
}
// This should really not happen
assert( bytes_read != 0);
// Calculate DMA bandwidth
tbb::tick_count now = tbb::tick_count::now();
double time_diff = (double)((now - lastStartTime).seconds());
lastStartTime = now;
double bwd = bytes_read / ( time_diff * 1024.0 * 1024.0 );
std::cout << "Read returned: " << bytes_read << ", DMA bandwidth " << bwd << "MBytes/sec\n";
// Have more data to process.
Slice* this_slice = next_slice;
next_slice = Slice::getAllocated();
// Adjust the end of this buffer
this_slice->set_end( this_slice->end() + bytes_read );
return this_slice;
}
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "InputFilter.h" #include "InputFilter.h"
#include "FileDmaInputFilter.h" #include "FileDmaInputFilter.h"
#include "WZDmaInputFilter.h" #include "WZDmaInputFilter.h"
#include "dma_input.h" #include "DmaInputFilter.h"
#include "file_input.h" #include "file_input.h"
#include "processor.h" #include "processor.h"
#include "elastico.h" #include "elastico.h"
...@@ -42,11 +42,10 @@ bool silent = false; ...@@ -42,11 +42,10 @@ bool silent = false;
int run_pipeline( int nbThreads, ctrl& control, config& conf ) int run_pipeline( int nbThreads, ctrl& control, config& conf )
{ {
config::InputType input = conf.getInput(); config::InputType input = conf.getInput();
size_t packetBufferSize = conf.getDmaPacketBufferSize();
size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers();
size_t MAX_BYTES_PER_INPUT_SLICE = conf.getDmaPacketBufferSize(); // Create empty input reader, will assign later when we know what is the data source
size_t TOTAL_SLICES = conf.getNumberOfDmaPacketBuffers();
// Create empty input reader, will assing later when we know what is the data source
std::shared_ptr<InputFilter> input_filter; std::shared_ptr<InputFilter> input_filter;
// Create the pipeline // Create the pipeline
...@@ -54,24 +53,23 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) ...@@ -54,24 +53,23 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf )
if (input == config::InputType::FILE) { if (input == config::InputType::FILE) {
// Create file-reading writing stage and add it to the pipeline // Create file-reading writing stage and add it to the pipeline
MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer(); //MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer();
TOTAL_SLICES = conf.getNumInputBuffers(); //TOTAL_SLICES = conf.getNumInputBuffers();
//input_filter = std::make_shared<FileInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); //input_filter = std::make_shared<FileInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES );
throw std::runtime_error("input type FILE is temporarily not supported"); throw std::runtime_error("input type FILE is not supported");
} else if (input == config::InputType::DMA) { } else if (input == config::InputType::DMA) {
// Create DMA reader // Create DMA reader
//input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), packetBufferSize, nbPacketBuffers, control );
throw std::runtime_error("input type DMA is temporarily not supported");
} else if (input == config::InputType::FILEDMA) { } else if (input == config::InputType::FILEDMA) {
// Create FILE DMA reader // Create FILE DMA reader
input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), packetBufferSize, nbPacketBuffers, control );
} else if (input == config::InputType::WZDMA ) { } else if (input == config::InputType::WZDMA ) {
// Create WZ DMA reader // Create WZ DMA reader
input_filter = std::make_shared<WZDmaInputFilter>( MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); input_filter = std::make_shared<WZDmaInputFilter>( packetBufferSize, nbPacketBuffers, control );
} else { } else {
throw std::invalid_argument("Configuration error: Unknown input type was specified"); throw std::invalid_argument("Configuration error: Unknown input type was specified");
...@@ -80,13 +78,9 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) ...@@ -80,13 +78,9 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf )
// Add input reader to a pipeline // Add input reader to a pipeline
pipeline.add_filter( *input_filter ); pipeline.add_filter( *input_filter );
LOG(INFO) << "Configuration translated into:";
LOG(INFO) << " MAX_BYTES_PER_INPUT_SLICE: " << MAX_BYTES_PER_INPUT_SLICE;
LOG(INFO) << " TOTAL_SLICES: " << TOTAL_SLICES;
// Create reformatter and add it to the pipeline // Create reformatter and add it to the pipeline
// TODO: Created here so we are not subject of scoping, fix later... // TODO: Created here so we are not subject of scoping, fix later...
StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE, conf.getDoZS()); StreamProcessor stream_processor(packetBufferSize, conf.getDoZS());
if ( conf.getEnableStreamProcessor() ) { if ( conf.getEnableStreamProcessor() ) {
pipeline.add_filter( stream_processor ); pipeline.add_filter( stream_processor );
} }
...@@ -94,7 +88,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) ...@@ -94,7 +88,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf )
// Create elastic populator (if requested) // Create elastic populator (if requested)
std::string url = conf.getElasticUrl(); std::string url = conf.getElasticUrl();
// TODO: Created here so we are not subject of scoping, fix later... // TODO: Created here so we are not subject of scoping, fix later...
ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE, ElasticProcessor elastic_processor(packetBufferSize,
&control, &control,
url, url,
conf.getPtCut(), conf.getPtCut(),
...@@ -144,6 +138,12 @@ int main( int argc, char* argv[] ) { ...@@ -144,6 +138,12 @@ int main( int argc, char* argv[] ) {
control.packets_per_report = conf.getPacketsPerReport(); control.packets_per_report = conf.getPacketsPerReport();
control.output_force_write = conf.getOutputForceWrite(); control.output_force_write = conf.getOutputForceWrite();
// Firmware needs at least 1MB buffer for DMA
if (conf.getDmaPacketBufferSize() < 1024*1024) {
LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file.";
return 1;
}
boost::asio::io_service io_service; boost::asio::io_service io_service;
server s(io_service, conf.getPortNumber(), control); server s(io_service, conf.getPortNumber(), control);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment