diff --git a/CMakeLists.txt b/CMakeLists.txt index fe47a7e64f9e2c51898d8eb3328b26ef505ae4dc..a0cb9bfac8356411844bcbd90b0ec0a7c501e34f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -64,6 +64,10 @@ target_link_libraries(netio-bus-unbuffered-publish felix-tag felix-def felix-int tdaq_add_executable(netio-bus-buffered-publish examples/netio_bus_buffered_publish.cpp) target_link_libraries(netio-bus-buffered-publish felix-tag felix-def felix-interface netio-lib felix-bus-server-lib docopt) +tdaq_add_executable(felix-l0-source examples/felix_l0_source.cpp) +target_link_libraries(felix-l0-source felix-tag felix-def felix-interface netio-lib felix-bus-server-lib docopt) +install(TARGETS felix-l0-source) + tdaq_add_executable(felix-client-subscribe examples/felix_client_subscribe.cpp) target_link_libraries(felix-client-subscribe felix-tag felix-client-lib netio-lib docopt) diff --git a/examples/felix_l0_source.cpp b/examples/felix_l0_source.cpp new file mode 100644 index 0000000000000000000000000000000000000000..332b69f80883dadf0ff09e542e9fd625ea06f137 --- /dev/null +++ b/examples/felix_l0_source.cpp @@ -0,0 +1,392 @@ +#include <cerrno> +#include <chrono> +#include <cstdio> +#include <iterator> +#include <string> +#include <ostream> +#include <sstream> +#include <iostream> +#include <thread> +#include <utility> +#include <map> +#include <atomic> + +#include "docopt/docopt.h" +#include "felixbus/felixbus.h" +#include "felix/felix_fid.h" +#include "netio/netio.h" +#include "felixtag.h" + +static const char USAGE[] = +R"(felix-l0-source - Publish TTC to host messages on the TTC virtual elink and inform felix-bus. + + Usage: + felix-l0-source [options] <local_ip> <local_port> + + Options: + -h --help Show this help. + --version Show version. + --period=<period in µs> Network message period [default: 30]. + 0 means as fast as possible. + --ppm=<nb of TTC packets> Number of TTC packets per network message [default: 30]. + TTC message rate = ppm/period + --verbose-bus Show bus information + --bus-dir=<bus-directory> Set bus directory [default: bus] + --bus-group-name=<group-name> Set group-name for bus to use [default: FELIX] + --netio-pages=<nb netio pages> Number of netio pages assigned to the socket [default: 512] +)"; + +static netio_tag_t const TTC_FID = 0x1000000806000000; +static uint8_t TTC_MESSAGE[] = { + /* First byte is the chunk status (set by felix-tohost): 0x00 = no problem + * Payload: TTC to host packet format + * https://atlas-project-felix.web.cern.ch/atlas-project-felix/user/felix-doc/felix-user-manual/5.x/C_datastructures.html + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | version | A | BCID | B | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | L1ID | ext.L1ID (ECR)| + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | orbit | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | trigger type | reserved | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | L0ID (currently same as extended L1ID) | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | L1A counter: LSB (monotonic counter for L1 trigger signals) | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | L1A counter: MSB |C| unknown | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * A: length of this packet, 20 or 26 + * B: reserved + * C: full: indication of overflow in TTC to host FIFO + * + * Example: 00031bbb0800000000320b4551000000000000000000000000000000 + * 0008bb1b03 + * 00: status OK + * 0: reserved + * 8bb: BCID + * 1b: packet length = 27 + * 03: version 3 + * 010000f5: extended L1ID + * 320b4551: orbit + * 00000000: reserved | trigger type + * 010000f5: L0ID = extended L1ID + * 00000000000000: L1A (disabled in this example), not full + */ + 0x00, // status byte + 0x03, 0x1b, 0xbb, 0x08, // version, length, bcid, reserved + 0x00, 0x00, 0x00, 0x00, // l1id + 0x32, 0x0b, 0x45, 0x51, // orbit + 0x00, 0x00, 0x00, 0x00, // trigger type, reserved + 0x00, 0x00, 0x00, 0x00, // l0id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // l1a, full + 0x00 // unknown +}; +size_t constexpr TTC_MESSAGE_LENGTH = std::size(TTC_MESSAGE); + +void dump_ttc_packet(uint8_t* msg) +{ + // offset are + 1 because of the leading status byte + uint8_t const length = *(msg+2); + std::cout + << "version: " << static_cast<uint32_t>(*(msg+1)) + << " length: " << static_cast<uint32_t>(length) + << " bcid: " << (*reinterpret_cast<uint16_t*>(msg+3) & 0x0FFF) + << " l1id: " << *reinterpret_cast<uint32_t*>(msg+5) + << " orbit: " << *reinterpret_cast<uint32_t*>(msg+9) + << " trigger type: " << *reinterpret_cast<uint16_t*>(msg+13) + << " l0id: " << *reinterpret_cast<uint32_t*>(msg+17); + if (length == 27) { + uint64_t const l1a = *reinterpret_cast<uint32_t*>(msg+21) + + (static_cast<uint64_t>(*reinterpret_cast<uint16_t*>(msg+21) & 0xEFFF) << 32); + std::cout + << " l1a: " << l1a + << " full: " << (*(msg+26) & 0x1) + << " unknown: " << static_cast<uint32_t>(*(msg+27)); + + } + std::cout << std::endl; +} + +struct { + std::string hostname; + uint port; + uint period; + uint stats_period; + uint ppm; + std::string bus_directory; + std::string bus_group_name; + bool verbose_bus; + bool no_bus; + unsigned netio_pages; + + struct netio_context ctx; + struct netio_publish_socket socket; + + struct netio_timer stats_timer; + + struct netio_signal send_signal; + std::thread publish_thread; + std::atomic<bool> running; +} config; + +struct { + uint64_t sent; + uint64_t recent_sent; + std::chrono::high_resolution_clock::time_point start_ts; + std::chrono::high_resolution_clock::time_point recent_ts; + bool backpressure; +} stats; + +void on_subscribe(struct netio_publish_socket* _socket, netio_tag_t tag, + void* _addr, size_t _addrlen) +{ + std::cout << "client subscribed to tag 0x" << std::hex << tag << std::dec << std::endl; +} + +void on_connection_established(struct netio_publish_socket* _socket) +{ + std::cout <<"connection to subscriber established" << std::endl; + netio_signal_fire(&config.send_signal); +} + +void on_connection_closed(struct netio_publish_socket* _socket) +{ + std::cout << "connection to subscriber closed" << std::endl; +} + +void on_bind_refused(struct netio_listen_socket*) +{ + std::cerr << "could not bind given endpoint: exiting" << std::endl; + netio_terminate(&config.ctx.evloop); +} + +void send_ttc_packets(void*) { + static int flags = 0; + unsigned sent = 0; + while (sent < config.ppm) { + int const rv = netio_buffered_publish(&config.socket, TTC_FID, TTC_MESSAGE, + TTC_MESSAGE_LENGTH, flags, NULL); + if (rv == NETIO_STATUS_OK) { + flags = 0; + sent += 1; + // increment L1ID + *reinterpret_cast<uint32_t*>(TTC_MESSAGE + 5) += 1; + *reinterpret_cast<uint32_t*>(TTC_MESSAGE + 17) += 1; + } else if (rv == NETIO_STATUS_AGAIN) { + stats.backpressure = true; + flags |= NETIO_REENTRY; + break; + } else if (rv == NETIO_STATUS_OK_NOSUB) { + break; + } else { + std::cerr << "send error: " << rv << std::endl; + break; + } + } + + // send network messages + if (sent != 0) { + netio_buffered_publish_flush(&config.socket, 0, NULL); + + stats.sent += sent; + stats.recent_sent += sent; + } + //dump_ttc_packet(TTC_MESSAGE); +} + +void flood_ttc_packets(void*) { + static int flags = 0; + static unsigned sent_ttc_msg = 0; + + while (true) { + int const rv = netio_buffered_publish(&config.socket, TTC_FID, TTC_MESSAGE, + TTC_MESSAGE_LENGTH, flags, NULL); + if (rv == NETIO_STATUS_OK) { + flags = 0; + sent_ttc_msg += 1; + // increment L1ID + *reinterpret_cast<uint32_t*>(TTC_MESSAGE + 5) += 1; + *reinterpret_cast<uint32_t*>(TTC_MESSAGE + 17) += 1; + + if (sent_ttc_msg >= config.ppm) { + netio_buffered_publish_flush(&config.socket, 0, NULL); + stats.sent += sent_ttc_msg; + stats.recent_sent += sent_ttc_msg; + sent_ttc_msg = 0; + } + } else if (rv == NETIO_STATUS_AGAIN) { + flags |= NETIO_REENTRY; + break; + } else if (rv == NETIO_STATUS_OK_NOSUB) { + netio_signal_fire(&config.send_signal); + break; + } else { + std::cerr << "send error: " << rv << std::endl; + break; + } + } +} + +void on_buffer_available(netio_publish_socket*) +{ + flood_ttc_packets(nullptr); +} + +void publish_thread() +{ + while (config.running) { + netio_signal_fire(&config.send_signal); + auto deadline = std::chrono::high_resolution_clock::now() + + std::chrono::microseconds(config.period); + while (std::chrono::high_resolution_clock::now() < deadline) {} + } +} + +void on_stats(void*) { + auto const now = std::chrono::high_resolution_clock::now(); + std::cout << "current rate: " << 1e9 * stats.recent_sent / + std::chrono::duration_cast<std::chrono::nanoseconds>(now - stats.recent_ts).count() + << " Hz" + << (stats.backpressure ? " backpressure" : "") + << ", overall avg rate: " << 1e9 * stats.sent / + std::chrono::duration_cast<std::chrono::nanoseconds>(now - stats.start_ts).count() + << " Hz" << std::endl; + + stats.recent_ts = std::chrono::high_resolution_clock::now(); + stats.recent_sent = 0; + stats.backpressure = false; +} + +void on_init(void* data) { + struct netio_buffered_socket_attr attr; + attr.num_pages = config.netio_pages; + attr.pagesize = 1536; + // packet send is controlled "manually" (cf. send_ttc_packets) + attr.watermark = attr.pagesize + 1; // disable packet send on watermark + attr.timeout_ms = 0; // disable packet send on timeout + + std::cout << "opening publish socket on " << config.hostname.c_str() + << ":" << config.port << std::endl; + netio_publish_socket_init(&config.socket, &config.ctx, + config.hostname.c_str(), config.port, &attr); + + config.socket.cb_subscribe = on_subscribe; + config.socket.cb_connection_established = on_connection_established; + config.socket.cb_connection_closed = on_connection_closed; + config.socket.lsocket.cb_error_bind_refused = on_bind_refused; + if (config.period == 0) { + config.socket.cb_buffer_available = on_buffer_available; + } + + // FelixBus + struct felix_bus_info info; + info.ip = netio_hostname(config.hostname.c_str()); + info.raw_tcp = netio_tcp_mode(config.hostname.c_str()); + info.port = config.port; + info.unbuffered = false; + info.pubsub = true; + info.netio_pages = attr.num_pages; + info.netio_pagesize = attr.pagesize; + info.stream = false; + + uint8_t const vid = get_vid(TTC_FID); + uint8_t const did = get_did(TTC_FID); + uint32_t const cid = get_cid(TTC_FID); + // std::cout << "vid 0x" << std::hex << static_cast<uint32_t>(vid) << std::dec + // << ", did 0x" << std::hex << static_cast<uint32_t>(did) << std::dec + // << ", cid 0x" << std::hex << cid << std::dec << std::endl; + char* bus_path = felix_bus_path(config.bus_directory.c_str(), + config.bus_group_name.c_str(), vid, did, cid, "dma-0"); + if (bus_path == NULL) { + std::cerr << "error: cannot create bus_path" << std::endl; + exit(-1); + } + std::cout << "felix bus path: " << bus_path << std::endl; + + felix_bus bus = felix_bus_open(bus_path); + if (!bus) { + std::cerr << "error: cannot create felix bus: errno=" << errno + << " str=" << strerror(errno) << std::endl; + exit(-1); + } + + std::cout << "publishing FID 0x" << std::hex << TTC_FID << std::dec << std::endl; + if (felix_bus_write(bus, TTC_FID, &info) < 0) { + std::cerr << "error felix_bus_write: errno=" << errno + << " str=" << strerror(errno) << std::endl; + exit(-1); + } + + if (felix_bus_close(bus) < 0) { + std::cerr << "error felix_bus_close: errno=" << errno + << " str=" << strerror(errno) << std::endl; + } + + std::cout << "starting publication every " << config.period << " µs" + << ", avg L0 rate: " << static_cast<float>(config.ppm) * 1e6 / config.period + << " Hz" << std::endl; + + netio_timer_start_us(&config.stats_timer, config.stats_period); + + stats.start_ts = std::chrono::high_resolution_clock::now(); + stats.recent_ts = std::chrono::high_resolution_clock::now(); + stats.backpressure = false; + + if (config.period == 0) { + // netio_signal_fire(&config.send_signal); + } else { + config.running = true; + config.publish_thread = std::thread(publish_thread); + } +} + +int main(int argc, char** argv) +{ + std::map<std::string, docopt::value> args = docopt::docopt(USAGE, + { argv + 1, argv + argc }, + true, // show help if requested + (std::string(argv[0]) + " " + FELIX_TAG).c_str()); // version string + + try { + config.hostname = args["<local_ip>"].asString(); + config.port = args["<local_port>"].asLong(); + config.period = args["--period"].asLong(); + config.ppm = args["--ppm"].asLong(); + config.bus_directory = args["--bus-dir"].asString(); + config.bus_group_name = args["--bus-group-name"].asString(); + config.verbose_bus = args["--verbose-bus"].asBool(); + config.stats_period = 1000000; + config.netio_pages = args["--netio-pages"].asLong(); + } catch (std::invalid_argument const& error) { + std::cerr << "error: argument or option of wrong type" << std::endl; + std::cout << USAGE << std::endl; + return -1; + } + + config.running = true; + + netio_init(&config.ctx); + config.ctx.evloop.cb_init = on_init; + + netio_signal_init(&config.ctx.evloop, &config.send_signal); + if (config.period != 0) { + config.send_signal.cb = send_ttc_packets; + } else { + config.send_signal.cb = flood_ttc_packets; + } + + netio_timer_init(&config.ctx.evloop, &config.stats_timer); + config.stats_timer.cb = on_stats; + + netio_run(&config.ctx.evloop); + + if (config.period != 0) { + config.running = false; + config.publish_thread.join(); + } + + return 0; +}