diff --git a/src/config.cc b/src/config.cc new file mode 100644 index 0000000000000000000000000000000000000000..a7cd82cfdcce6147a4e85d3257fd34894dc80662 --- /dev/null +++ b/src/config.cc @@ -0,0 +1,19 @@ +#include <iostream> +#include <fstream> +#include "config.h" + +config::config(std::string filename){ + std::ifstream in(filename.c_str(),std::ios_base::in); + std::string item; + while(!std::getline(in,item,':').eof()){ + std::string value; + std::getline(in,value); + vmap[item]=value; + } +} +void config::print()const { + for(std::map<std::string,std::string>::const_iterator it = vmap.begin(); it!=vmap.end(); it++){ + std::cout << "key " << it->first << " value " << it->second << std::endl; + } + +} diff --git a/src/config.h b/src/config.h new file mode 100644 index 0000000000000000000000000000000000000000..12e2917f311e082ccaeaaea18f1ab3d20fc0b027 --- /dev/null +++ b/src/config.h @@ -0,0 +1,70 @@ +#ifndef CONFIG_H +#define CONFIG_H + +#include <string> +#include <stdint.h> +#include "boost/lexical_cast.hpp" +#include <map> +#include <stdexcept> + +class config{ +public: + + + config(std::string filename); + + void print() const; + + std::string getInputFile() const { + std::string filename; + try{ + filename = vmap.at("input_file"); + }catch(const std::out_of_range& oor){ + + } + return filename; + } + std::string getElasticUrl() const + { + return vmap.at("elastic_url"); + } + uint32_t getQualCut() const { + std::string v = vmap.at("quality_cut"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } + uint32_t getPtCut() const { + std::string v = vmap.at("pt_cut"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } + std::string getOutputFilenameBase() const + { + return vmap.at("output_filename_base"); + } + uint64_t getOutputMaxFileSize() const { + std::string v = vmap.at("max_file_size"); + return boost::lexical_cast<uint64_t>(v.c_str()); + } + uint32_t getNumThreads() const { + std::string v = vmap.at("threads"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } + uint32_t getNumInputBuffers() const { + std::string v = vmap.at("input_buffers"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } + uint32_t getBlocksPerInputBuffer() const { + std::string v = vmap.at("blocks_buffer"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } + short getPortNumber() const { + std::string v = vmap.at("port"); + return boost::lexical_cast<short>(v.c_str()); + } + +private: + + std::map<std::string,std::string> vmap; + + +}; +#endif diff --git a/src/controls.h b/src/controls.h new file mode 100644 index 0000000000000000000000000000000000000000..730a695913e8d82faa48119414212b6b7bf4a3cb --- /dev/null +++ b/src/controls.h @@ -0,0 +1,10 @@ +#ifndef CONTROLS_H +#define CONTROLS_H +#include <stdint.h> + +struct ctrl{ + uint32_t run_number; + bool running; + uint64_t max_file_size; +}; +#endif diff --git a/src/elastico.cc b/src/elastico.cc new file mode 100644 index 0000000000000000000000000000000000000000..85b6c8852b69a7cb8caa1dde7a99d97aba82e996 --- /dev/null +++ b/src/elastico.cc @@ -0,0 +1,155 @@ +#include <cstdio> + + +#include "elastico.h" +#include "format.h" +#include "slice.h" +#include "controls.h" + +size_t dummy(char *data, size_t n, size_t l, void *s) { + // std::cout << data << std::endl; +return n*l; } + +ElasticProcessor::ElasticProcessor(size_t max_size_, ctrl *c, std::string requrl, + uint32_t ptcut, uint32_t qualcut) : + tbb::filter(parallel), + max_size(max_size_), + control(c), + request_url(requrl), + c_request_url(""), + pt_cut(ptcut), + qual_cut(qualcut), + handle(0), + headers(NULL) +{ fprintf(stderr,"Created elastico filter at 0x%llx \n",(unsigned long long)this);} +ElasticProcessor::~ElasticProcessor(){ + // fprintf(stderr,"Wrote %d muons \n",totcount); +} + +void ElasticProcessor::makeCreateIndexRequest(unsigned int run){ + if(handle){ + curl_slist_free_all(headers); + curl_easy_cleanup(handle); + headers=NULL; + handle=0; + } + std::ostringstream ost; + ost << run; + c_request_url = request_url+"_"+ost.str(); + std::string index_setting_run = "{\n"; + index_setting_run += "\"settings\" : {\n"; + index_setting_run += " \"number_of_shards\" : 2,\n"; + index_setting_run += " \"number_of_replicas\" : 0,\n"; + index_setting_run += " \"refresh_interval\" : \"2s\"\n"; + index_setting_run += " },\n"; + index_setting_run += "\"mappings\" : {\n"; + index_setting_run += " \"_doc\" : {\n"; + index_setting_run += " \"properties\" : {\n"; + index_setting_run += " \"orbit\" : {\"type\" : \"integer\", \"index\" : \"true\"},\n"; + index_setting_run += " \"bx\" : {\"type\" : \"integer\", \"index\" : \"true\"},\n"; + index_setting_run += " \"eta\" : {\"type\" : \"float\", \"index\" : \"true\"},\n"; + index_setting_run += " \"phi\" : {\"type\" : \"float\", \"index\" : \"true\"},\n"; + index_setting_run += " \"etap\" : {\"type\" : \"float\", \"index\" : \"true\"},\n"; + index_setting_run += " \"phip\" : {\"type\" : \"float\", \"index\" : \"true\"},\n"; + index_setting_run += " \"pt\" : {\"type\" : \"float\", \"index\" : \"true\"},\n"; + index_setting_run += " \"chrg\" : {\"type\" : \"integer\", \"index\" : \"true\"},\n"; + index_setting_run += " \"qual\" : {\"type\" : \"integer\", \"index\" : \"true\"}\n"; + index_setting_run += " }\n"; + index_setting_run += " }\n"; + index_setting_run += " }\n"; + index_setting_run += "}"; + handle = curl_easy_init(); + curl_easy_setopt(handle, CURLOPT_VERBOSE, 0LL); + headers=NULL; /* init to NULL is important */ + headers = curl_slist_append(headers, "Content-Type: application/json"); + + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(handle, CURLOPT_URL, c_request_url.c_str()); + curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "PUT"); /* !!! */ + + curl_easy_setopt(handle, CURLOPT_POSTFIELDS, index_setting_run.c_str()); /* data goes here */ + + int res = curl_easy_perform(handle); + printf("index creation returned %d\n",res); + curl_slist_free_all(headers); + curl_easy_cleanup(handle); + //now set up the handle for bulk population + handle = curl_easy_init(); + std::string moreheaders = "Expect:"; + headers=NULL; /* init to NULL is important */ + headers = curl_slist_append(headers, "Content-Type: application/json"); + headers = curl_slist_append(headers, moreheaders.c_str()); + + curl_easy_setopt(handle, CURLOPT_VERBOSE, 0LL); + curl_easy_setopt(handle, CURLOPT_HEADER, 0); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, dummy); + + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers); + std::string p_request_url=c_request_url+"/_doc/_bulk"; + curl_easy_setopt(handle, CURLOPT_URL, p_request_url.c_str()); +} + +void ElasticProcessor::makeAppendToBulkRequest(std::ostringstream &particle_data, char*c){ + uint32_t *p = (uint32_t*)c; + uint32_t header = *p++; + int mAcount = (header&header_masks::mAcount)>>header_shifts::mAcount; + int mBcount = (header&header_masks::mBcount)>>header_shifts::mBcount; + + uint32_t bx=*p++; + uint32_t orbit=*p++; + for(unsigned int i = 0; i < mAcount; i++){ + uint32_t mf = *p++; + uint32_t ms = *p++; + uint32_t ipt = (mf >> shifts::pt) & masks::pt; + if(ipt<pt_cut)continue; + uint32_t qual = (mf >> shifts::qual) & masks::qual; + if(qual<qual_cut)continue; + float pt = (ipt-1)*gmt_scales::pt_scale; + float phiext = ((mf >> shifts::phiext) & masks::phiext)*gmt_scales::phi_scale; + uint32_t ietaext = ((mf >> shifts::etaext) & masks::etaextv); + if(((mf >> shifts::etaext) & masks::etaexts)!=0) ietaext -= 512; + float etaext = ietaext*gmt_scales::eta_scale; + uint32_t iso = (ms >> shifts::iso) & masks::iso; + uint32_t chrg = (ms >> shifts::chrg) & masks::chrg; + uint32_t chrgv = (ms >> shifts::chrgv) & masks::chrgv; + uint32_t index = (ms >> shifts::index) & masks::index; + float phi = ((ms >> shifts::phi) & masks::phi)*gmt_scales::phi_scale; + uint32_t ieta = (ms >> shifts::eta) & masks::etav; + if(((mf >> shifts::eta) & masks::etas)!=0) ieta -= 512; + float eta = ieta*gmt_scales::eta_scale; + particle_data << "{\"index\" : {}}\n" + << "{\"orbit\": " << orbit << ',' + << "\"bx\": " << bx << ',' + << "\"eta\": " << eta << ',' + << "\"phi\": " << phi << ',' + << "\"etap\": " << etaext << ',' + << "\"phip\": " << phiext << ',' + << "\"pt\": " << pt << ',' + << "\"chrg\": " << chrg << ',' + << "\"qual\": " << qual + << "}\n"; + + } + +} + + + +void* ElasticProcessor::operator()( void* item ){ + Slice& input = *static_cast<Slice*>(item); + std::ostringstream particle_data; + char* p = input.begin(); + if(control->running){ + if(c_request_url.empty()) makeCreateIndexRequest(control->run_number); + while(p!=input.end()){ + makeAppendToBulkRequest(particle_data,p); + } + curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE,particle_data.str().length()); + curl_easy_setopt(handle, CURLOPT_COPYPOSTFIELDS, particle_data.str().c_str()); /* data goes here */ + int res = curl_easy_perform(handle); + } + if(!control->running && !c_request_url.empty()){ + c_request_url.clear(); + } + return &input; +} diff --git a/src/elastico.h b/src/elastico.h new file mode 100644 index 0000000000000000000000000000000000000000..cb74faee08256aae092996a01c5ee9a77085dd21 --- /dev/null +++ b/src/elastico.h @@ -0,0 +1,33 @@ +#ifndef ELASTICO_H +#define ELASTICO_H + +#include <cstdint> +#include <string> +#include <sstream> +#include <iostream> +#include "curl/curl.h" +#include "tbb/pipeline.h" + +class ctrl; + +//reformatter + +class ElasticProcessor: public tbb::filter { +public: + ElasticProcessor(size_t, ctrl *, std::string, uint32_t, uint32_t); + void* operator()( void* item )/*override*/; + ~ElasticProcessor(); +private: + void makeCreateIndexRequest(unsigned int); + void makeAppendToBulkRequest(std::ostringstream &, char *); + size_t max_size; + ctrl *control; + std::string request_url; + std::string c_request_url; + uint32_t pt_cut; + uint32_t qual_cut; + CURL *handle; + struct curl_slist *headers; +}; + +#endif diff --git a/src/file_input.cc b/src/file_input.cc new file mode 100644 index 0000000000000000000000000000000000000000..a5e155bc4a46a3b21e8e6c8298941858a8641934 --- /dev/null +++ b/src/file_input.cc @@ -0,0 +1,59 @@ +#include <cstdio> +#include "file_input.h" +#include "slice.h" +#include "utility.h" + +FileInputFilter::FileInputFilter( FILE* input_file_ , size_t max_size_, + size_t nslices_) : + + filter(serial_in_order), + input_file(input_file_), + next_slice(Slice::preAllocate( max_size_,nslices_) ), + counts(0), + ncalls(0), + lastStartTime(tbb::tick_count::now()), + last_count(0) +{ + fprintf(stderr,"Created input filter and allocated at 0x%llx \n",(unsigned long long)next_slice); +} + +FileInputFilter::~FileInputFilter() { + fprintf(stderr,"Destroy input filter and delete at 0x%llx \n",(unsigned long long)next_slice); + Slice::giveAllocated(next_slice); + fprintf(stderr,"input operator total %u read \n",counts); +} + +void* FileInputFilter::operator()(void*) { + + ncalls++; + size_t m = next_slice->avail(); + size_t n = fread( next_slice->begin(), 1, m, input_file ); + + if(n<m){ + // fprintf(stderr,"input operator read less %d \n",n); + + } + counts+=n/192; + + if(ncalls%100000==0){ + printf("input bandwidth %f\n",double((counts-last_count)*192/1024./1024.)/ + double((tbb::tick_count::now() - lastStartTime).seconds())); + lastStartTime=tbb::tick_count::now(); + last_count=counts; + } + if( n==0 ) { + fseek(input_file,0,SEEK_SET); + void *a; + return this->operator()(a); + } else { + // Have more data to process. + Slice& t = *next_slice; + next_slice = Slice::getAllocated(); + + char* p = t.end()+n; + t.set_end(p); + + return &t; + } + +} diff --git a/src/file_input.h b/src/file_input.h new file mode 100644 index 0000000000000000000000000000000000000000..b7c5c1332107ced3d56398a1b72d83460553b721 --- /dev/null +++ b/src/file_input.h @@ -0,0 +1,23 @@ +#ifndef INPUT_H +#define INPUT_H + +#include "tbb/pipeline.h" +#include "tbb/tick_count.h" + +class Slice; + +class FileInputFilter: public tbb::filter { + public: + FileInputFilter( FILE*, size_t, size_t); + ~FileInputFilter(); + private: + FILE* input_file; + Slice* next_slice; + void* operator()(void*) /*override*/; + uint64_t counts; + uint64_t ncalls; + tbb::tick_count lastStartTime; + uint64_t last_count; +}; + +#endif diff --git a/src/format.h b/src/format.h new file mode 100644 index 0000000000000000000000000000000000000000..959db1b4a7d6a20e1182a88cd8c1c21e1c24b02c --- /dev/null +++ b/src/format.h @@ -0,0 +1,69 @@ +#ifndef FORMAT_H +#define FORMAT_H + +#include <stdint.h> +#include <math.h> + +struct block1{ + uint32_t bx[8]; + uint32_t orbit[8]; + uint32_t mu1f[8]; + uint32_t mu1s[8]; + uint32_t mu2f[8]; + uint32_t mu2s[8]; +}; + +struct masks{ + static const uint32_t phiext = 0x1ff; + static const uint32_t pt = 0x1ff; + static const uint32_t qual = 0xf; + static const uint32_t etaext = 0x1ff; + static const uint32_t etaextv = 0xff; + static const uint32_t etaexts = 0x100; + static const uint32_t iso = 0x3; + static const uint32_t chrg = 0x1; + static const uint32_t chrgv = 0x1; + static const uint32_t index = 0x7f; + static const uint32_t phi = 0x3ff; + static const uint32_t eta = 0x1ff; + static const uint32_t etav = 0xff; + static const uint32_t etas = 0x100; + static const uint32_t rsv = 0x3; +}; + +struct shifts{ + static const uint32_t phiext = 0; + static const uint32_t pt = 10; + static const uint32_t qual = 19; + static const uint32_t etaext = 23; + static const uint32_t iso = 0; + static const uint32_t chrg = 2; + static const uint32_t chrgv = 3; + static const uint32_t index = 4; + static const uint32_t phi = 11; + static const uint32_t eta = 21; + static const uint32_t rsv = 30; +}; + +struct header_shifts{ + static const uint32_t bxmatch=24; + static const uint32_t mAcount=16; + static const uint32_t orbitmatch=8; + static const uint32_t mBcount=0; +}; + + +struct header_masks{ + static const uint32_t bxmatch = 0xff<<header_shifts::bxmatch; + static const uint32_t mAcount = 0xf<<header_shifts::mAcount; + static const uint32_t orbitmatch = 0xff<<header_shifts::orbitmatch; + static const uint32_t mBcount = 0xf; +}; + +struct gmt_scales{ + static const float pt_scale = 0.5; + static const float phi_scale = 2.*M_PI/576.; + static const float eta_scale = 0.0870/8; //9th MS bit is sign + static const float phi_range = M_PI; +}; +#endif diff --git a/src/input.cc b/src/input.cc new file mode 100644 index 0000000000000000000000000000000000000000..2bf2cb87a27bf9e4e06400893a1baf12baa64e84 --- /dev/null +++ b/src/input.cc @@ -0,0 +1,46 @@ +#include <cstdio> +#include "input.h" +#include "slice.h" + +InputFilter::InputFilter( FILE* input_file_ , size_t max_size_, + size_t nslices_) : + + filter(serial_in_order), + input_file(input_file_), + next_slice(Slice::preAllocate( max_size_,nslices_) ), + counts(0) +{ + fprintf(stderr,"Created input filter and allocated at 0x%llx \n",(unsigned long long)next_slice); +} + +InputFilter::~InputFilter() { + fprintf(stderr,"Destroy input filter and delete at 0x%llx \n",(unsigned long long)next_slice); + Slice::giveAllocated(next_slice); + fprintf(stderr,"input operator total %u read \n",counts); +} + +void* InputFilter::operator()(void*) { + + size_t m = next_slice->avail(); + size_t n = fread( next_slice->begin(), 1, m, input_file ); + + if(n<m){ + fprintf(stderr,"input operator read less %d \n",n); + + } + counts+=n/192; + + if( n==0 ) { + return NULL; + } else { + // Have more data to process. + Slice& t = *next_slice; + next_slice = Slice::getAllocated(); + + char* p = t.end()+n; + t.set_end(p); + + return &t; + } + +} diff --git a/src/input.h b/src/input.h new file mode 100644 index 0000000000000000000000000000000000000000..a9c0f82403e19ad879eeb35e76358f45b7044752 --- /dev/null +++ b/src/input.h @@ -0,0 +1,19 @@ +#ifndef INPUT_H +#define INPUT_H + +#include "tbb/pipeline.h" + +class Slice; + +class InputFilter: public tbb::filter { + public: + InputFilter( FILE*, size_t, size_t); + ~InputFilter(); + private: + FILE* input_file; + Slice* next_slice; + void* operator()(void*) /*override*/; + uint32_t counts; +}; + +#endif diff --git a/src/output.cc b/src/output.cc new file mode 100644 index 0000000000000000000000000000000000000000..b382a8f22d153c5e71476fa8f15b1c279e608541 --- /dev/null +++ b/src/output.cc @@ -0,0 +1,45 @@ +#include "output.h" +#include "slice.h" + + +OutputStream::OutputStream( const char* output_file_base, ctrl *c) : + tbb::filter(serial_in_order), + my_output_file_base(output_file_base), + totcounts(0), + current_file_size(0), + file_count(0), + control(c), + current_file(0) +{ + fprintf(stderr,"Created output filter at 0x%llx \n",(unsigned long long)this); +} + +void* OutputStream::operator()( void* item ) { + Slice& out = *static_cast<Slice*>(item); + totcounts += out.get_counts(); + if(control->running){ + if(current_file==0 || current_file_size > control->max_file_size) open_next_file(); + + size_t n = fwrite( out.begin(), 1, out.size(), current_file ); + current_file_size+=n; + if( n!=out.size() ) { + fprintf(stderr,"Can't write into output file \n"); + } + } + if(!control->running && current_file!=0){ + fclose(current_file); + current_file=0; + file_count = 0; + } + out.free(); + return NULL; +} + +void OutputStream::open_next_file(){ + if(current_file){fclose(current_file); current_file_size=0; file_count += 1;} + char run_order_stem[18]; + sprintf(run_order_stem,"_%06d_%06d.dat",control->run_number,file_count); + std::string file_end(run_order_stem); + std::string filename = my_output_file_base+file_end; + current_file = fopen( filename.c_str(), "w" ); +} diff --git a/src/output.h b/src/output.h new file mode 100644 index 0000000000000000000000000000000000000000..829bcad4fda93e991977b8a80e2665807ce930b5 --- /dev/null +++ b/src/output.h @@ -0,0 +1,29 @@ +#ifndef OUTPUT_H +#define OUTPUT_H + +#include <cstdio> +#include <stdint.h> +#include <string> +#include "tbb/pipeline.h" + +#include "controls.h" + +//! Filter that writes each buffer to a file. +class OutputStream: public tbb::filter { + + +public: + OutputStream( const char*, ctrl *c ); + void* operator()( void* item ) /*override*/; + +private: + void open_next_file(); + std::string my_output_file_base; + uint32_t totcounts; + uint64_t current_file_size; + uint32_t file_count; + ctrl *control; + FILE *current_file; +}; + +#endif diff --git a/src/processor.cc b/src/processor.cc new file mode 100644 index 0000000000000000000000000000000000000000..ab5c67b36d8abbe90ed45834c07537e67d47fbbe --- /dev/null +++ b/src/processor.cc @@ -0,0 +1,120 @@ +#include <cstdio> +#include "processor.h" +#include "format.h" +#include "slice.h" +#include <iostream> + +StreamProcessor::StreamProcessor(size_t max_size_) : + max_size(max_size_), + tbb::filter(parallel) +{ fprintf(stderr,"Created transform filter at 0x%llx \n",(unsigned long long)this);} +StreamProcessor::~StreamProcessor(){ + // fprintf(stderr,"Wrote %d muons \n",totcount); +} + +void* StreamProcessor::operator()( void* item ){ + Slice& input = *static_cast<Slice*>(item); + + if(input.size()<max_size){ + // fprintf(stderr,"transform operator gets slice of %d size\n",input.size()); + + + } + // (unsigned long long)input.end()); + char* p = input.begin(); + Slice& out = *Slice::allocate( 2*max_size); + char* q = out.begin(); + uint32_t counts = 0; + int bsize = sizeof(block1); + + while(p!=input.end()){ + + block1 *bl = (block1*)p; + int mAcount = 0; + int mBcount = 0; + + uint32_t bxmatch=0; + uint32_t orbitmatch=0; + bool AblocksOn[8]; + bool BblocksOn[8]; + for(unsigned int i = 0; i < 8; i++){ + uint32_t bx = bl->bx[i]; + bxmatch += (bx==bl->bx[0])<<i; + uint32_t orbit = bl->orbit[i]; + orbitmatch += (orbit==bl->orbit[0])<<i; + + // uint32_t phiext = (bl->mu1f[i] >> shifts::phiext) & masks::phiext; + uint32_t pt = (bl->mu1f[i] >> shifts::pt) & masks::pt; + // uint32_t qual = (bl->mu1f[i] >> shifts::qual) & masks::qual; + // uint32_t etaext = (bl->mu1f[i] >> shifts::etaext) & masks::etaext; + // uint32_t iso = (bl->mu1s[i] >> shifts::iso) & masks::iso; + // uint32_t chrg = (bl->mu1s[i] >> shifts::chrg) & masks::chrg; + // uint32_t chrgv = (bl->mu1s[i] >> shifts::chrgv) & masks::chrgv; + // uint32_t index = (bl->mu1s[i] >> shifts::index) & masks::index; + // uint32_t phi = (bl->mu1s[i] >> shifts::phi) & masks::phi; + // uint32_t eta = (bl->mu1s[i] >> shifts::eta) & masks::eta; + // uint32_t rsv = (bl->mu1s[i] >> shifts::rsv) & masks::rsv; + AblocksOn[i]=(pt>0); + if(pt>0){ + mAcount++; + } + // phiext = (bl->mu2f[i] >> shifts::phiext) & masks::phiext; + pt = (bl->mu2f[i] >> shifts::pt) & masks::pt; + // qual = (bl->mu2f[i] >> shifts::qual) & masks::qual; + // etaext = (bl->mu2f[i] >> shifts::etaext) & masks::etaext; + // iso = (bl->mu2s[i] >> shifts::iso) & masks::iso; + // chrg = (bl->mu2s[i] >> shifts::chrg) & masks::chrg; + // chrgv = (bl->mu2s[i] >> shifts::chrgv) & masks::chrgv; + // index = (bl->mu2s[i] >> shifts::index) & masks::index; + // phi = (bl->mu2s[i] >> shifts::phi) & masks::phi; + // eta = (bl->mu2s[i] >> shifts::eta) & masks::eta; + // rsv = (bl->mu2s[i] >> shifts::rsv) & masks::rsv; + BblocksOn[i]=(pt>0); + if(pt>0)mBcount++; + } + + uint32_t bxcount = std::max(mAcount,mBcount); + if(bxcount == 0) { + p+=sizeof(block1); + continue; + } + + uint32_t header = (bxmatch<<24)+(mAcount << 16) + (orbitmatch<<8) + mBcount; + + counts += mAcount; + counts += mBcount; + memcpy(q,(char*)&header,4); q+=4; + memcpy(q,(char*)&bl->bx[0],4); q+=4; + memcpy(q,(char*)&bl->orbit[0],4); q+=4; + for(unsigned int i = 0; i < 8; i++){ + if(AblocksOn[i]){ + memcpy(q,(char*)&bl->mu1f[i],4); q+=4; + memcpy(q,(char*)&bl->mu1s[i],4); q+=4; + } + } + for(unsigned int i = 0; i < 8; i++){ + if(BblocksOn[i]){ + memcpy(q,(char*)&bl->mu2f[i],4); q+=4; + memcpy(q,(char*)&bl->mu2s[i],4); q+=4; + } + } + p+=sizeof(block1); + + } + + + //here do the processing + // for(;;) { + // if( p==input.end() ) + // break; + // // Note: no overflow checking is needed here, as we have twice the + // // input string length, but the square of a non-negative integer n + // // cannot have more than twice as many digits as n. + // m(q,"%ld",y); + // q = strchr(q,0); + // } + out.set_end(q); + Slice::giveAllocated(&input); + out.set_counts(counts); + return &out; +} diff --git a/src/processor.h b/src/processor.h new file mode 100644 index 0000000000000000000000000000000000000000..3c3a75de1db457e980a246fdf9f898db2948deaa --- /dev/null +++ b/src/processor.h @@ -0,0 +1,17 @@ +#ifndef PROCESSOR_H +#define PROCESSOR_H + +#include "tbb/pipeline.h" + +//reformatter + +class StreamProcessor: public tbb::filter { +public: + StreamProcessor(size_t); + void* operator()( void* item )/*override*/; + ~StreamProcessor(); +private: + size_t max_size; +}; + +#endif diff --git a/src/scdaq.cc b/src/scdaq.cc new file mode 100644 index 0000000000000000000000000000000000000000..1ff1422402ddbe747cfe626e9bf65be1dff2e9bc --- /dev/null +++ b/src/scdaq.cc @@ -0,0 +1,126 @@ +#include "tbb/pipeline.h" +#include "tbb/tick_count.h" +#include "tbb/task_scheduler_init.h" +#include "tbb/tbb_allocator.h" +#include "tbb/concurrent_queue.h" +#include <cstring> +#include <cstdlib> +#include <cstdio> +#include <cctype> +#include <string> +#include <iostream> + +#include <boost/bind.hpp> +#include <boost/asio.hpp> +#include <boost/thread.hpp> + + + +#include "file_input.h" +#include "processor.h" +#include "elastico.h" +#include "output.h" +#include "format.h" +#include "server.h" +#include "controls.h" +#include "config.h" + +using namespace std; + + + + + + + +bool silent = false; + +int run_pipeline( int nthreads, ctrl *control, config *conf) +{ + + size_t MAX_BYTES_PER_INPUT_SLICE = 192*conf->getBlocksPerInputBuffer(); + std::string InputFileName = conf->getInputFile(); + FILE* input_file = fopen( InputFileName.c_str(), "r" ); + if( !input_file ) { + throw std::invalid_argument( ("Invalid input file name: "+InputFileName).c_str() ); + return 0; + } + size_t TOTAL_SLICES = conf->getNumInputBuffers(); + + std::string output_file_base = conf->getOutputFilenameBase(); + + // Create the pipeline + tbb::pipeline pipeline; + + // Create file-reading writing stage and add it to the pipeline + FileInputFilter input_filter( input_file,MAX_BYTES_PER_INPUT_SLICE, + TOTAL_SLICES); + pipeline.add_filter( input_filter ); + + // Create reformatter and add it to the pipeline + StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE); + pipeline.add_filter( stream_processor ); + + // Create elastic populator (if requested) + std::string url = conf->getElasticUrl(); + if(url.compare(0,2,"no")!=0){ + ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE, + control, + url, + conf->getPtCut(), + conf->getQualCut()); + pipeline.add_filter(elastic_processor); + } + + // Create file-writing stage and add it to the pipeline + OutputStream output_stream( output_file_base.c_str() , control); + pipeline.add_filter( output_stream ); + + // Run the pipeline + tbb::tick_count t0 = tbb::tick_count::now(); + // Need more than one token in flight per thread to keep all threads + // busy; 2-4 works + pipeline.run( nthreads*4 ); + tbb::tick_count t1 = tbb::tick_count::now(); + + // fclose( output_file ); + fclose( input_file ); + + if ( !silent ) printf("time = %g\n", (t1-t0).seconds()); + + return 1; +} + + + +int main( int argc, char* argv[] ) { + printf("here 0\n"); + try { + config conf("scdaq.conf"); + conf.print(); + printf("here 1\n"); + ctrl control; + // tbb::tick_count mainStartTime = tbb::tick_count::now(); + + + control.running = false; + control.run_number = 0; + control.max_file_size = conf.getOutputMaxFileSize();//in Bytes + + boost::asio::io_service io_service; + server s(io_service, conf.getPortNumber(), &control); + boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); + + int p = conf.getNumThreads(); + tbb::task_scheduler_init init(p); + if(!run_pipeline (p,&control, &conf)) + return 1; + + // utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds()); + + return 0; + } catch(std::exception& e) { + std::cerr<<"error occurred. error text is :\"" <<e.what()<<"\"\n"; + return 1; + } +} diff --git a/src/scdaq.conf b/src/scdaq.conf new file mode 100644 index 0000000000000000000000000000000000000000..4ba23e22f27dd3c0eed7e0b6386c12b1072ec32c --- /dev/null +++ b/src/scdaq.conf @@ -0,0 +1,10 @@ +input_file:/dev/shm/testdata.bin +output_filename_base:/tmp/scdaq +max_file_size:1073741824 +threads:8 +input_buffers:10 +blocks_buffer:1000 +port:8000 +elastic_url:no +pt_cut:7 +quality_cut:12 \ No newline at end of file diff --git a/src/server.h b/src/server.h new file mode 100644 index 0000000000000000000000000000000000000000..015090c7a8b94cef1bddd6fe05eb98ec15bc54aa --- /dev/null +++ b/src/server.h @@ -0,0 +1,49 @@ +#ifndef SERVER_H +#define SERVER_H +#include <boost/bind.hpp> +#include <boost/asio.hpp> +#include "session.h" +#include "controls.h" + +class server +{ +public: + server(boost::asio::io_service& io_service, short port, ctrl *c) + : io_service_(io_service), + acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), + control(c) + { + start_accept(control); + } + +private: + void start_accept(ctrl *c) + { + session* new_session = new session(io_service_,c); + acceptor_.async_accept(new_session->socket(), + boost::bind(&server::handle_accept, this, new_session, + boost::asio::placeholders::error)); + } + + void handle_accept(session* new_session, + const boost::system::error_code& error) + { + if (!error) + { + new_session->start(); + } + else + { + delete new_session; + } + + start_accept(control); + } + + boost::asio::io_service& io_service_; + tcp::acceptor acceptor_; + ctrl *control; +}; + + +#endif diff --git a/src/session.cc b/src/session.cc new file mode 100644 index 0000000000000000000000000000000000000000..c6729971ccef7304dde7f4c0d15086f2da0c271a --- /dev/null +++ b/src/session.cc @@ -0,0 +1,6 @@ +#include <string> + +#include "session.h" + +const std::string session::reply_success="ok "; +const std::string session::reply_failure="error"; diff --git a/src/session.h b/src/session.h new file mode 100644 index 0000000000000000000000000000000000000000..5d7765fc112499799e5fa7f8ec6feae8b90fae3b --- /dev/null +++ b/src/session.h @@ -0,0 +1,97 @@ +#ifndef SESSION_H +#define SESSION_H +#include <cstdlib> +#include <iostream> +#include <boost/bind.hpp> +#include <boost/asio.hpp> +#include <boost/tokenizer.hpp> +#include <string> +#include "controls.h" + +using boost::asio::ip::tcp; + + + +class session +{ +public: + session(boost::asio::io_service& io_service, ctrl *c) + : socket_(io_service), + control(c) + { + } + + tcp::socket& socket() + { + return socket_; + } + + void start() + { + socket_.async_read_some(boost::asio::buffer(data_, max_length), + boost::bind(&session::handle_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + +private: + void handle_read(const boost::system::error_code& error, + size_t bytes_transferred) + { + if (!error) + { + printf("handle read\n",data_); + std::string s=data_; + + boost::tokenizer<> tok(s); + boost::tokenizer<>::iterator i = tok.begin(); + std::string command = *i; + std::string runno = *(++i); + if(command=="start" && !control->running){ + control->run_number = atoi(runno.c_str()); + control->running = true; + } + else if(command=="stop" && control->running){ + control->running = false; + } + sprintf(data_,"ok "); + bytes_transferred = reply_size; + boost::asio::async_write(socket_, + boost::asio::buffer(data_, bytes_transferred), + boost::bind(&session::handle_write, this, + boost::asio::placeholders::error)); + + } + else + { + delete this; + } + } + + void handle_write(const boost::system::error_code& error) + { + if (!error) + { + printf("handle write\n",data_); + socket_.async_read_some(boost::asio::buffer(data_, max_length), + boost::bind(&session::handle_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + printf("what got %s\n",data_); + } + else + { + delete this; + } + } + + tcp::socket socket_; + enum { max_length = 1024 }; + char data_[max_length]; + ctrl *control; + static const std::string reply_success; + static const std::string reply_failure; + static const size_t reply_size = 5; +}; + +#endif diff --git a/src/slice.cc b/src/slice.cc new file mode 100644 index 0000000000000000000000000000000000000000..36190f0cee0dc51fe910a352b0705b9fdb24ef45 --- /dev/null +++ b/src/slice.cc @@ -0,0 +1,37 @@ +#include "slice.h" + +tbb::concurrent_bounded_queue<Slice*> Slice::free_slices = tbb::concurrent_bounded_queue<Slice*>(); + +Slice *Slice::preAllocate(size_t max_size, size_t nslices){ + if(Slice::free_slices.empty()){ + for(unsigned int i = 0; i < nslices; i++){ + Slice* t = Slice::allocate(max_size); + Slice::free_slices.push(t); + } + } + Slice *t; + Slice::free_slices.pop(t); + return t; +} + +void Slice::shutDown(){ + while(!Slice::free_slices.empty()){ + Slice *t; + Slice::free_slices.pop(t); + t->free(); + } +} + +Slice *Slice::getAllocated(){ + Slice *t; + // fprintf(stderr,"getAllocated called, current size %d\n",free_slices.size()); + Slice::free_slices.pop(t); + // fprintf(stderr,"successfully popped, current size %d\n",free_slices.size()); + return t; +} + +void Slice::giveAllocated(Slice *t){ + t->set_end(t->begin()); + Slice::free_slices.push(t); +} + diff --git a/src/slice.h b/src/slice.h new file mode 100644 index 0000000000000000000000000000000000000000..5c1c72acc1e969276b72774ca7b9be255b07ed10 --- /dev/null +++ b/src/slice.h @@ -0,0 +1,52 @@ +#ifndef SLICE_H +#define SLICE_H + +#include <stdint.h> +#include "tbb/tbb_allocator.h" +#include "tbb/concurrent_queue.h" + +//! Holds a slice of data. +class Slice { + //! Pointer to one past last filled byte in sequence + char* logical_end; + //! Pointer to one past last available byte in sequence. + char* physical_end; + uint32_t counts; + bool output; + static tbb::concurrent_bounded_queue<Slice*> free_slices; + +public: + //! Allocate a Slice object that can hold up to max_size bytes + static Slice* allocate( size_t max_size ) { + Slice* t = (Slice*)tbb::zero_allocator<char>().allocate( sizeof(Slice)+max_size ); + t->logical_end = t->begin(); + t->physical_end = t->begin()+max_size; + t->counts = 0; + t->output = false; + return t; + } + static Slice *preAllocate(size_t, size_t); + static void shutDown(); + static Slice *getAllocated(); + static void giveAllocated(Slice *); + + //! Free a Slice object + void free() { + // fprintf(stderr,"slice free at 0x%llx \n",(unsigned long long) this); + tbb::tbb_allocator<char>().deallocate((char*)this,sizeof(Slice)+(physical_end-begin())+1); + } + //! Pointer to beginning of sequence + char* begin() {return (char*)(this+sizeof(Slice));} + //! Pointer to one past last character in sequence + char* end() {return logical_end;} + //! Length of sequence + size_t size() const {return logical_end-(char*)(this+sizeof(Slice));} + //! Maximum number of characters that can be appended to sequence + size_t avail() const {return physical_end-logical_end;} + //! Set end() to given value. + void set_end( char* p ) {logical_end=p;} + void set_output(bool o) {output=o;} + void set_counts(uint32_t c){counts=c;} + uint32_t get_counts() const {return counts;} +}; +#endif diff --git a/src/utility.h b/src/utility.h new file mode 100644 index 0000000000000000000000000000000000000000..e4306fead034e278d85feb2649d8ba2b6b3c0ad4 --- /dev/null +++ b/src/utility.h @@ -0,0 +1,521 @@ +/* + ============================================================== + + SAMPLE SOURCE CODE - SUBJECT TO THE TERMS OF SAMPLE CODE LICENSE AGREEMENT, + http://software.intel.com/en-us/articles/intel-sample-source-code-license-agreement/ + + Copyright 2005-2018 Intel Corporation + + THIS FILE IS PROVIDED "AS IS" WITH NO WARRANTIES, EXPRESS OR IMPLIED, INCLUDING BUT + NOT LIMITED TO ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR + PURPOSE, NON-INFRINGEMENT OF INTELLECTUAL PROPERTY RIGHTS. + + ============================================================= +*/ + +#ifndef UTILITY_H_ +#define UTILITY_H_ + +#if __TBB_MIC_OFFLOAD +#pragma offload_attribute (push,target(mic)) +#include <exception> +#include <cstdio> +#pragma offload_attribute (pop) +#endif // __TBB_MIC_OFFLOAD + +#include <utility> +#include <string> +#include <cstring> +#include <vector> +#include <map> +#include <set> +#include <algorithm> +#include <sstream> +#include <numeric> +#include <stdexcept> +#include <memory> +#include <cassert> +#include <iostream> +#include <cstdlib> +// TBB headers should not be used, as some examples may need to be built without TBB. + +namespace utility{ + namespace internal{ + +#if (_MSC_VER >= 1600 || __cplusplus >= 201103L || __GXX_EXPERIMENTAL_CXX0X__) \ + && (_CPPLIB_VER || _LIBCPP_VERSION || __GLIBCXX__ && _UNIQUE_PTR_H ) \ + && (!__INTEL_COMPILER || __INTEL_COMPILER >= 1200 ) + // std::unique_ptr is available, and compiler can use it + #define smart_ptr std::unique_ptr + using std::swap; +#else + #if __INTEL_COMPILER && __GXX_EXPERIMENTAL_CXX0X__ + // std::unique_ptr is unavailable, so suppress std::auto_prt<> deprecation warning + #pragma warning(disable: 1478) + #endif + #define smart_ptr std::auto_ptr + // in some C++ libraries, std::swap does not work with std::auto_ptr + template<typename T> + void swap( std::auto_ptr<T>& ptr1, std::auto_ptr<T>& ptr2 ) { + std::auto_ptr<T> tmp; tmp = ptr2; ptr2 = ptr1; ptr1 = tmp; + } +#endif + + //TODO: add tcs + template<class dest_type> + dest_type& string_to(std::string const& s, dest_type& result){ + std::stringstream stream(s); + stream>>result; + if ((!stream)||(stream.fail())){ + throw std::invalid_argument("error converting string '"+std::string(s)+"'"); + } + return result; + } + + template<class dest_type> + dest_type string_to(std::string const& s){ + dest_type result; + return string_to(s,result); + } + + template<typename> + struct is_bool { static bool value(){return false;}}; + template<> + struct is_bool<bool> { static bool value(){return true;}}; + + class type_base { + type_base& operator=(const type_base&); + public: + const std::string name; + const std::string description; + + type_base (std::string a_name, std::string a_description) : name(a_name), description(a_description) {} + virtual void parse_and_store(const std::string & s) = 0; + virtual std::string value() const = 0; + virtual smart_ptr<type_base> clone() const = 0; + virtual ~type_base(){} + }; + template <typename type> + class type_impl : public type_base { + private: + type_impl& operator=(const type_impl&); + typedef bool(*validating_function_type)(const type&); + private: + type & target; + validating_function_type validating_function; + public: + type_impl(std::string a_name, std::string a_description, type & a_target, validating_function_type a_validating_function = NULL) + : type_base (a_name,a_description), target(a_target),validating_function(a_validating_function) + {}; + void parse_and_store (const std::string & s) /*override*/ { + try{ + const bool is_bool = internal::is_bool<type>::value(); + if (is_bool && s.empty()){ + //to avoid directly assigning true + //(as it will impose additional layer of indirection) + //so, simply pass it as string + internal::string_to("1",target); + }else { + internal::string_to(s,target); + } + }catch(std::invalid_argument& e){ + std::stringstream str; + str <<"'"<<s<<"' is incorrect input for argument '"<<name<<"'" + <<" ("<<e.what()<<")"; + throw std::invalid_argument(str.str()); + } + if (validating_function){ + if (!((validating_function)(target))){ + std::stringstream str; + str <<"'"<<target<<"' is invalid value for argument '"<<name<<"'"; + throw std::invalid_argument(str.str()); + } + } + } + template <typename t> + static bool is_null_c_str(t&){return false;} + static bool is_null_c_str(char* s){return s==NULL;} + std::string value() const /*override*/ { + std::stringstream str; + if (!is_null_c_str(target)) + str<<target; + return str.str(); + } + smart_ptr<type_base> clone() const /*override*/ { + return smart_ptr<type_base>(new type_impl(*this)); + } + }; + + class argument{ + private: + smart_ptr<type_base> p_type; + bool matched_; + public: + argument(argument const& other) + : p_type(other.p_type.get() ? (other.p_type->clone()).release() : NULL) + ,matched_(other.matched_) + {} + argument& operator=(argument a){ + this->swap(a); + return *this; + } + void swap(argument& other){ + internal::swap(p_type, other.p_type); + std::swap(matched_,other.matched_); + } + template<class type> + argument(std::string a_name, std::string a_description, type& dest, bool(*a_validating_function)(const type&)= NULL) + :p_type(new type_impl<type>(a_name,a_description,dest,a_validating_function)) + ,matched_(false) + {} + std::string value()const{ + return p_type->value(); + } + std::string name()const{ + return p_type->name; + } + std::string description() const{ + return p_type->description; + } + void parse_and_store(const std::string & s){ + p_type->parse_and_store(s); + matched_=true; + } + bool is_matched() const{return matched_;} + }; + } // namespace internal + + class cli_argument_pack{ + typedef std::map<std::string,internal::argument> args_map_type; + typedef std::vector<std::string> args_display_order_type; + typedef std::vector<std::string> positional_arg_names_type; + private: + args_map_type args_map; + args_display_order_type args_display_order; + positional_arg_names_type positional_arg_names; + std::set<std::string> bool_args_names; + private: + void add_arg(internal::argument const& a){ + std::pair<args_map_type::iterator, bool> result = args_map.insert(std::make_pair(a.name(),a)); + if (!result.second){ + throw std::invalid_argument("argument with name: '"+a.name()+"' already registered"); + } + args_display_order.push_back(a.name()); + } + public: + template<typename type> + cli_argument_pack& arg(type& dest,std::string const& name, std::string const& description, bool(*validate)(const type &)= NULL){ + internal::argument a(name,description,dest,validate); + add_arg(a); + if (internal::is_bool<type>::value()){ + bool_args_names.insert(name); + } + return *this; + } + + //Positional means that argument name can be omitted in actual CL + //only key to match values for parameters with + template<typename type> + cli_argument_pack& positional_arg(type& dest,std::string const& name, std::string const& description, bool(*validate)(const type &)= NULL){ + internal::argument a(name,description,dest,validate); + add_arg(a); + if (internal::is_bool<type>::value()){ + bool_args_names.insert(name); + } + positional_arg_names.push_back(name); + return *this; + } + + void parse(std::size_t argc, char const* argv[]){ + { + std::size_t current_positional_index=0; + for (std::size_t j=1;j<argc;j++){ + internal::argument* pa = NULL; + std::string argument_value; + + const char * const begin=argv[j]; + const char * const end=begin+std::strlen(argv[j]); + + const char * const assign_sign = std::find(begin,end,'='); + + struct throw_unknown_parameter{ static void _(std::string const& location){ + throw std::invalid_argument(std::string("unknown parameter starting at:'")+location+"'"); + }}; + //first try to interpret it like parameter=value string + if (assign_sign!=end){ + std::string name_found = std::string(begin,assign_sign); + args_map_type::iterator it = args_map.find(name_found ); + + if(it!=args_map.end()){ + pa= &((*it).second); + argument_value = std::string(assign_sign+1,end); + }else { + throw_unknown_parameter::_(argv[j]); + } + } + //then see is it a named flag + else{ + args_map_type::iterator it = args_map.find(argv[j] ); + if(it!=args_map.end()){ + pa= &((*it).second); + argument_value = ""; + } + //then try it as positional argument without name specified + else if (current_positional_index < positional_arg_names.size()){ + std::stringstream str(argv[j]); + args_map_type::iterator found_positional_arg = args_map.find(positional_arg_names.at(current_positional_index)); + //TODO: probably use of smarter assert would help here + assert(found_positional_arg!=args_map.end()/*&&"positional_arg_names and args_map are out of sync"*/); + if (found_positional_arg==args_map.end()){ + throw std::logic_error("positional_arg_names and args_map are out of sync"); + } + pa= &((*found_positional_arg).second); + argument_value = argv[j]; + + current_positional_index++; + }else { + //TODO: add tc to check + throw_unknown_parameter::_(argv[j]); + } + } + assert(pa); + if (pa->is_matched()){ + throw std::invalid_argument(std::string("several values specified for: '")+pa->name()+"' argument"); + } + pa->parse_and_store(argument_value); + } + } + } + std::string usage_string(const std::string& binary_name)const{ + std::string command_line_params; + std::string summary_description; + + for (args_display_order_type::const_iterator it = args_display_order.begin();it!=args_display_order.end();++it){ + const bool is_bool = (0!=bool_args_names.count((*it))); + args_map_type::const_iterator argument_it = args_map.find(*it); + //TODO: probably use of smarter assert would help here + assert(argument_it!=args_map.end()/*&&"args_display_order and args_map are out of sync"*/); + if (argument_it==args_map.end()){ + throw std::logic_error("args_display_order and args_map are out of sync"); + } + const internal::argument & a = (*argument_it).second; + command_line_params +=" [" + a.name() + (is_bool ?"":"=value")+ "]"; + summary_description +=" " + a.name() + " - " + a.description() +" ("+a.value() +")" + "\n"; + } + + std::string positional_arg_cl; + for (positional_arg_names_type::const_iterator it = positional_arg_names.begin();it!=positional_arg_names.end();++it){ + positional_arg_cl +=" ["+(*it); + } + for (std::size_t i=0;i<positional_arg_names.size();++i){ + positional_arg_cl+="]"; + } + command_line_params+=positional_arg_cl; + std::stringstream str; + using std::endl; + str << " Program usage is:" << endl + << " " << binary_name << command_line_params + << endl << endl + << " where:" << endl + << summary_description + ; + return str.str(); + } + }; // class cli_argument_pack + + namespace internal { + template<typename T> + bool is_power_of_2( T val ) { + size_t intval = size_t(val); + return (intval&(intval-1)) == size_t(0); + } + int step_function_plus(int previous, double step){ + return static_cast<int>(previous+step); + } + int step_function_multiply(int previous, double multiply){ + return static_cast<int>(previous*multiply); + } + // "Power-of-2 ladder": nsteps is the desired number of steps between any subsequent powers of 2. + // The actual step is the quotient of the nearest smaller power of 2 divided by that number (but at least 1). + // E.g., '1:32:#4' means 1,2,3,4,5,6,7,8,10,12,14,16,20,24,28,32 + int step_function_power2_ladder(int previous, double nsteps){ + int steps = int(nsteps); + assert( is_power_of_2(steps) ); // must be a power of 2 + // The actual step is 1 until the value is twice as big as nsteps + if( previous < 2*steps ) + return previous+1; + // calculate the previous power of 2 + int prev_power2 = previous/2; // start with half the given value + int rshift = 1; // and with the shift of 1; + while( int shifted = prev_power2>>rshift ) { // shift the value right; while the result is non-zero, + prev_power2 |= shifted; // add the bits set in 'shifted'; + rshift <<= 1; // double the shift, as twice as many top bits are set; + } // repeat. + ++prev_power2; // all low bits set; now it's just one less than the desired power of 2 + assert( is_power_of_2(prev_power2) ); + assert( (prev_power2<=previous)&&(2*prev_power2>previous) ); + // The actual step value is the previous power of 2 divided by steps + return previous + (prev_power2/steps); + } + typedef int (* step_function_ptr_type)(int,double); + + struct step_function_descriptor { + char mnemonic; + step_function_ptr_type function; + public: + step_function_descriptor(char a_mnemonic, step_function_ptr_type a_function) : mnemonic(a_mnemonic), function(a_function) {} + private: + void operator=(step_function_descriptor const&); + }; + step_function_descriptor step_function_descriptors[] = { + step_function_descriptor('*',step_function_multiply), + step_function_descriptor('+',step_function_plus), + step_function_descriptor('#',step_function_power2_ladder) + }; + + template<typename T, size_t N> + inline size_t array_length(const T(&)[N]) + { + return N; + } + + struct thread_range_step { + step_function_ptr_type step_function; + double step_function_argument; + + thread_range_step ( step_function_ptr_type step_function_, double step_function_argument_) + :step_function(step_function_),step_function_argument(step_function_argument_) + { + if (!step_function_) + throw std::invalid_argument("step_function for thread range step should not be NULL"); + } + int operator()(int previous)const { + assert(0<=previous); // test 0<=first and loop discipline + const int ret = step_function(previous,step_function_argument); + assert(previous<ret); + return ret; + } + friend std::istream& operator>>(std::istream& input_stream, thread_range_step& step){ + char function_char; + double function_argument; + input_stream >> function_char >> function_argument; + size_t i = 0; + while ((i<array_length(step_function_descriptors)) && (step_function_descriptors[i].mnemonic!=function_char)) ++i; + if (i >= array_length(step_function_descriptors)){ + throw std::invalid_argument("unknown step function mnemonic: "+std::string(1,function_char)); + } else if ((function_char=='#') && !is_power_of_2(function_argument)) { + throw std::invalid_argument("the argument of # should be a power of 2"); + } + step.step_function = step_function_descriptors[i].function; + step.step_function_argument = function_argument; + return input_stream; + } + }; + } // namespace internal + + struct thread_number_range{ + int (*auto_number_of_threads)(); + int first; // 0<=first (0 can be used as a special value) + int last; // first<=last + + internal::thread_range_step step; + + thread_number_range( int (*auto_number_of_threads_)(),int low_=1, int high_=-1 + , internal::thread_range_step step_ = internal::thread_range_step(internal::step_function_power2_ladder,4) + ) + : auto_number_of_threads(auto_number_of_threads_), first(low_), last((high_>-1) ? high_ : auto_number_of_threads_()) + ,step(step_) + { + if (first<0) { + throw std::invalid_argument("negative value not allowed"); + } + if (first>last) { + throw std::invalid_argument("decreasing sequence not allowed"); + } + } + friend std::istream& operator>>(std::istream& i, thread_number_range& range){ + try{ + std::string s; + i>>s; + struct string_to_number_of_threads{ + int auto_value; + string_to_number_of_threads(int auto_value_):auto_value(auto_value_){} + int operator()(const std::string & value)const{ + return (value=="auto")? auto_value : internal::string_to<int>(value); + } + }; + string_to_number_of_threads string_to_number_of_threads(range.auto_number_of_threads()); + int low, high; + std::size_t colon = s.find(':'); + if ( colon == std::string::npos ){ + low = high = string_to_number_of_threads(s); + } else { + //it is a range + std::size_t second_colon = s.find(':',colon+1); + + low = string_to_number_of_threads(std::string(s, 0, colon)); //not copying the colon + high = string_to_number_of_threads(std::string(s, colon+1, second_colon - (colon+1))); //not copying the colons + if (second_colon != std::string::npos){ + internal::string_to(std::string(s,second_colon + 1),range.step); + } + } + range = thread_number_range(range.auto_number_of_threads,low,high,range.step); + }catch(std::invalid_argument&){ + i.setstate(std::ios::failbit); + throw; + } + return i; + } + friend std::ostream& operator<<(std::ostream& o, thread_number_range const& range){ + using namespace internal; + size_t i = 0; + for (; i < array_length(step_function_descriptors) && step_function_descriptors[i].function != range.step.step_function; ++i ) {} + if (i >= array_length(step_function_descriptors)){ + throw std::invalid_argument("unknown step function for thread range"); + } + o<<range.first<<":"<<range.last<<":"<<step_function_descriptors[i].mnemonic<<range.step.step_function_argument; + return o; + } + }; // struct thread_number_range + //TODO: fix unused warning here + //TODO: update the thread range description in the .html files + static const char* thread_number_range_desc="number of threads to use; a range of the form low[:high[:(+|*|#)step]]," + "\n\twhere low and optional high are non-negative integers or 'auto' for the default choice," + "\n\tand optional step expression specifies how thread numbers are chosen within the range." + "\n\tSee examples/common/index.html for detailed description." + ; + + inline void report_elapsed_time(double seconds){ + std::cout<<"elapsed time : "<<seconds<<" seconds"<<std::endl; + } + + inline void report_skipped(){ + std::cout<<"skip"<<std::endl; + } + + inline void parse_cli_arguments(int argc, const char* argv[], utility::cli_argument_pack cli_pack){ + bool show_help = false; + cli_pack.arg(show_help,"-h","show this message"); + + bool invalid_input=false; + try { + cli_pack.parse(argc,argv); + }catch(std::exception& e){ + std::cerr + <<"error occurred while parsing command line."<<std::endl + <<"error text: "<<e.what()<<std::endl + <<std::flush; + invalid_input =true; + } + if (show_help || invalid_input){ + std::cout<<cli_pack.usage_string(argv[0])<<std::flush; + std::exit(0); + } + + } + inline void parse_cli_arguments(int argc, char* argv[], utility::cli_argument_pack cli_pack){ + parse_cli_arguments(argc, const_cast<const char**>(argv), cli_pack); + } +} + +#endif /* UTILITY_H_ */