diff --git a/test/pipeline_serial_in_order_test/Makefile b/test/pipeline_serial_in_order_test/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..31963895032a3eef957a2e39efc72d52fc26c61f --- /dev/null +++ b/test/pipeline_serial_in_order_test/Makefile @@ -0,0 +1,56 @@ +# example makefile +# +# Notes: +# - the indents must be tabs +# - if you need to specify a non-standard compiler, then set CXX +# (it defaults to g++) +# - if you need to pass flags to the linker, set LDFLAGS rather +# than alter the ${TARGET}: ${OBJECTS} rule +# - to see make's built-in rules use make -p + +# target executable name +TARGET = main + +# source files +SOURCES = main.cc + +# work out names of object files from sources +OBJECTS = $(SOURCES:.cc=.o) +OBJECTS += $(C_SOURCES:.c=.o) + +# compiler flags (do not include -c here as it's dealt with by the +# appropriate rules; CXXFLAGS gets passed as part of command +# invocation for both compilation (where -c is needed) and linking +# (where it's not.) +CXXFLAGS = -std=c++11 -Wall -Wextra -O0 -g -rdynamic -Wconversion + +CFLAGS = $(CXXFLAGS) +LDFLAGS = -ltbb -ltbbmalloc + +CPPFLAGS = -I. + +# default target (to build all) +all: ${TARGET} + +# clean target +clean: + rm ${OBJECTS} ${TARGET} + +# rule to link object files to create target executable +# $@ is the target, here $(TARGET), and $^ is all the +# dependencies separated by spaces (duplicates removed), +# here ${OBJECTS} +${TARGET}: ${OBJECTS} + ${LINK.cc} -o $@ $^ + +# no rule is needed here for compilation as make already +# knows how to do it + +# header dependencies (in more complicated cases, you can +# look into generating these automatically.) + +#product.o : product.h test1.h test2.h + +#test1.o : product.h test1.h test2.h + +#test2.o : product.h test2.h diff --git a/test/pipeline_serial_in_order_test/log.h b/test/pipeline_serial_in_order_test/log.h new file mode 100644 index 0000000000000000000000000000000000000000..fe65710ed9e3b2698aa2c25352fee311b007fd0c --- /dev/null +++ b/test/pipeline_serial_in_order_test/log.h @@ -0,0 +1,123 @@ +#pragma once + +#include <iostream> +#include <sstream> +#include <mutex> +#include <thread> +#include <iomanip> + +/* + * Simple logging macros, replace later with boost when we have a version with logging... + */ + +//#include <boost/log/trivial.hpp> + + +// Uncomment to include a thread ID in the log message +//#define LOG_THREAD_ID + +enum LOG_LEVEL { + TRACE, + DEBUG, + INFO, + WARNING, + ERROR, + FATAL +}; + +// Make a better function name but not so complicated +inline std::string simplify_pretty(const char *fname) +{ + std::string::size_type pos = 0; + std::string sfname(fname); + + // Get the function name, i.e. '...foo(' without '(' + while (true) { + pos = sfname.find('(', pos); + if (pos == std::string::npos) { + break; + } else { + if (pos > 0) { + // CHeck if the previous character is graphics (it may be the end of the function) + if (isgraph( sfname[pos-1] )) { + // We have the function name + break; + } + } + pos++; + } + }; + + if (pos == std::string::npos) { + // Failed to obtain the name, return the full one + return sfname; + } + + // Get the function name + std::string lfname = sfname.substr(0, pos); + + // Clean the function name + pos = lfname.find_last_of(' '); + + if (pos != std::string::npos) { + lfname = lfname.substr(pos+1); + } + + return lfname; +} + +//#define LOG(severity) ( tools::log::debug() << "[0x" << std::hex << std::this_thread::get_id() << std::dec << "] " << severity << " [" TOOLS_DEBUG_INFO ", " << __PRETTY_FUNCTION__ << "]: " ) +//#define LOG(severity) ( tools::log::log() << "[0x" << std::hex << std::this_thread::get_id() << std::dec << "] " << severity << __func__ << ": " ) + +#ifdef LOG_THREAD_ID + #define LOG(severity) ( tools::log::log() << "[0x" << std::hex << std::this_thread::get_id() << std::dec << "] " << severity << simplify_pretty(__PRETTY_FUNCTION__) << ": " ) +#else + #define LOG(severity) ( tools::log::log() << severity << simplify_pretty(__PRETTY_FUNCTION__) << ": " ) +#endif + +namespace tools { +namespace log { + +// inline before moved somewhere else +inline std::ostream& operator<< (std::ostream& os, enum LOG_LEVEL severity) +{ + switch (severity) + { + case TRACE: return os << "TRACE " ; + case DEBUG: return os << "DEBUG "; + case INFO: return os << "INFO "; + case WARNING: return os << "WARNING "; + case ERROR: return os << "ERROR "; + case FATAL: return os << "FATAL "; + }; + return os; +} + + +// From: https://stackoverflow.com/questions/2179623/how-does-qdebug-stuff-add-a-newline-automatically/2179782#2179782 + +static std::mutex lock; + +struct log { + log() { + } + + ~log() { + std::lock_guard<std::mutex> guard(lock); + std::cout << os.str() << std::endl; + } + +public: + // accepts just about anything + template<class T> + log& operator<<(const T& x) { + os << x; + return *this; + } +private: + std::ostringstream os; +}; + + +} // namespace log +} // namespace tools diff --git a/test/pipeline_serial_in_order_test/main.cc b/test/pipeline_serial_in_order_test/main.cc new file mode 100644 index 0000000000000000000000000000000000000000..3d9d8e0cb9fe43dc827b9340dc282761f118414f --- /dev/null +++ b/test/pipeline_serial_in_order_test/main.cc @@ -0,0 +1,104 @@ +/* + * Checking if pipeline output is in order when having parallel processing + * filters in the middle. + */ +#include <iostream> + +#include "tbb/concurrent_queue.h" +#include "tbb/pipeline.h" +#include "tbb/task_scheduler_init.h" +#include "tbb/tbb_allocator.h" +#include "tbb/tick_count.h" + +#define LOG_THREAD_ID +#include "log.h" + +class MyInputFunc : public tbb::filter { + public: + MyInputFunc(uint64_t maxN) + : tbb::filter(tbb::filter::serial_in_order), maxN(maxN) {} + ~MyInputFunc() {} + + void *operator()(void *) { + current++; + if (current > maxN) { + return NULL; + } + LOG(INFO) << " " << current; + return reinterpret_cast<void *>(current); + } + + private: + uint64_t current = 0; + uint64_t maxN; +}; + +class MyTransformFunc : public tbb::filter { + public: + MyTransformFunc(int dummy) : tbb::filter(tbb::filter::parallel) { + // Need a dummy parameter, otherwise it will not compile + (void)(dummy); + } + ~MyTransformFunc() {} + + void *operator()(void *item) { + uint64_t number = reinterpret_cast<uint64_t>(item); + // Lower numbers are delayed more than higher numbers - should force out of + // order processing NOTE: It would be better to use barriers but this is + // simpler and also works + usleep(static_cast<unsigned int>((10 - (number % 10)) * 100000)); + LOG(INFO) << number; + return item; + } +}; + +class MyOutputFunc : public tbb::filter { + public: + MyOutputFunc(int dummy) : tbb::filter(tbb::filter::serial_in_order) { + // Need a dummy parameter, otherwise it will not compile + (void)(dummy); + } + ~MyOutputFunc() {} + + void *operator()(void *item) { + uint64_t number = reinterpret_cast<uint64_t>(item); + LOG(INFO) << " " << number; + if (number != last + 1) { + LOG(ERROR) << "Out of order number detected: " << number + << ", but the previous was " << last; + exit(-1); + } + last = number; + return NULL; + } + + uint64_t last = 0; +}; + +void RunPipeline(int ntoken, int maxN) { + tbb::pipeline pipeline; + + MyInputFunc myInputFunc(maxN); + MyTransformFunc myTransformFunc(0); + MyOutputFunc myOutputFunc(0); + + pipeline.add_filter(myInputFunc); + pipeline.add_filter(myTransformFunc); + pipeline.add_filter(myOutputFunc); + + pipeline.run(ntoken); +} + +int main(int argc, char *argv[]) { + (void)(argc); + (void)(argv); + + int threads = 10; + int tokens = 100; + int maxN = 100; + + tbb::task_scheduler_init init(threads); + RunPipeline(tokens, maxN); + + std::cout << "\nGood: Pipeline is maintaining the order.\n"; +}