Commit 3da097c4 authored by Serguei Kolos's avatar Serguei Kolos
Browse files

Improve synchronisation between raw data and TTC packets

parent 599b06d4
......@@ -109,7 +109,7 @@ public:
NetioPublisher(uint32_t cooperated, uint32_t l1id_offset, swrod::test::Barrier & barrier,
const std::string & hostname, uint32_t port,
uint32_t page_size, uint32_t buffer_pages,
bool zero_copy, uint32_t message_size, uint32_t ecr_interval,
bool zero_copy, uint32_t message_size, uint32_t ecr_interval, uint32_t synch_interval,
netio_tag_t first_channel, uint32_t channels_number, netio_tag_t l1a_channel,
int32_t cpu) :
m_barrier(barrier), m_l1a_channel(l1a_channel),
......@@ -117,7 +117,9 @@ public:
m_last_channel(m_first_channel + channels_number - 1),
m_current_channel(0),
m_channels_number(channels_number),
m_ecr_interval(ecr_interval), m_message_size(message_size),
m_ecr_interval(ecr_interval),
m_synch_interval(synch_interval),
m_message_size(message_size),
m_cpu(cpu),
m_cooperated_thread_num(cooperated),
m_l1id_offset(l1id_offset)
......@@ -203,6 +205,7 @@ private:
netio_tag_t m_current_channel;
uint32_t m_channels_number;
uint32_t m_ecr_interval;
uint32_t m_synch_interval;
uint32_t m_message_size;
uint32_t m_subscriptions = 0;
......@@ -241,14 +244,16 @@ void NetioPublisher::GBT_flood()
m_l1a_packet.message.l1id++;
m_l1a_packet.message.bcid = m_l1a_packet.message.l1id & BCID_MASK;
if ((m_l1a_packet.message.l1id % m_synch_interval) == 0) {
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
}
if (m_l1a_packet.message.l1id == m_ecr_interval) {
m_l1a_packet.message.l1id = 0;
m_l1a_packet.message.bcid = m_l1a_packet.message.l1id & BCID_MASK;
m_l1a_packet.message.ecr++;
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
netio_signal_fire(&m_signal);
return;
}
}
}
......@@ -275,14 +280,16 @@ void NetioPublisher::Full_flood()
m_l1a_packet.message.l1id += m_cooperated_thread_num;
m_l1a_packet.message.bcid = m_l1a_packet.message.l1id & BCID_MASK;
if ((m_l1a_packet.message.l1id % m_synch_interval) == 0) {
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
}
if (m_l1a_packet.message.l1id >= m_ecr_interval) {
m_l1a_packet.message.l1id = m_l1id_offset;
m_l1a_packet.message.bcid = m_l1a_packet.message.l1id & BCID_MASK;
m_l1a_packet.message.ecr++;
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
netio_signal_fire(&m_signal);
return;
}
}
m_current_channel = 0;
......@@ -298,21 +305,21 @@ void NetioPublisher::floodL1A()
if (ret != NETIO_STATUS_OK) {
netio_buffered_publish_flush(&m_socket, 0, NULL);
netio_signal_fire(&m_signal);
return; return;
return;
}
m_l1a_packet.message.l1id++;
m_l1a_packet.message.bcid = m_l1a_packet.message.l1id & BCID_MASK;
if ((m_l1a_packet.message.l1id % m_synch_interval) == 0) {
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
}
if (m_l1a_packet.message.l1id == m_ecr_interval) {
m_l1a_packet.message.l1id = 0;
m_l1a_packet.message.bcid = 0;
m_l1a_packet.message.ecr++;
netio_buffered_publish_flush(&m_socket, 0, NULL);
m_barrier.wait();
netio_signal_fire(&m_signal);
return;
}
}
}
......@@ -336,13 +343,15 @@ int main(int ac, char *av[])
("elinks-number,e", value<uint32_t>()->default_value(192), "e-links number per emulated felix card")
("packet-size,s", value<uint32_t>()->default_value(40), "packet size")
("ecr-interval,l", value<uint32_t>()->default_value(500000), "ECR interval")
("sync-interval,L", value<uint32_t>()->default_value(10000), "Synchronisation interval for data and TTC packets")
("host,H", value<std::string>()->default_value("127.0.0.1"), "Network interface for publishing")
("netio-page-size,B", value<uint32_t>()->default_value(100000), "Netio page size in bytes")
("ttc-netio-page-size,A", value<uint32_t>()->default_value(10000), "Netio page size for TTC-to-Host e-link in bytes")
("netio-page-size,B", value<uint32_t>()->default_value(100000), "Netio page for data e-links size in bytes")
("netio-pages-number,C", value<uint32_t>()->default_value(32), "Netio pages number")
("felix-cards,d", value<uint32_t>()->default_value(1), "number of emulated felix cards")
("workers,w", value<uint32_t>()->default_value(1), "worker threads per emulated felix card")
("zero-copy,Z", "use Netio zero-copy mode")
("cpu-affinity,A", "attach worker threads to specific CPU cores")
("cpu-affinity,Y", "attach worker threads to specific CPU cores")
("full-mode,F", "publish data in Full Mode");
variables_map arguments;
......@@ -366,10 +375,12 @@ int main(int ac, char *av[])
uint32_t felix_number = arguments["felix-cards"].as<uint32_t>();
uint32_t packet_size = arguments["packet-size"].as<uint32_t>();
uint32_t ecr_interval = arguments["ecr-interval"].as<uint32_t>();
uint32_t sync_interval = arguments["sync-interval"].as<uint32_t>();
uint64_t first_elink_id = arguments["first-elink"].as<uint64_t>();
uint64_t l1a_elink_id = arguments["l1a-elink"].as<uint64_t>();
uint32_t port_number = arguments["port"].as<uint32_t>();
std::string host = arguments["host"].as<std::string>();
uint32_t ttc_netio_page_size = arguments["ttc-netio-page-size"].as<uint32_t>();
uint32_t netio_page_size = arguments["netio-page-size"].as<uint32_t>();
uint32_t netio_pages_number = arguments["netio-pages-number"].as<uint32_t>();
bool zero_copy = arguments.count("zero-copy");
......@@ -401,7 +412,7 @@ int main(int ac, char *av[])
full_mode ? workers_number : 0, full_mode ? i : 0,
barrier, host, port_number++,
netio_page_size, netio_pages_number, zero_copy,
packet_size, ecr_interval,
packet_size, ecr_interval, sync_interval,
elinks_start, elinks_per_worker, 0, cpu_affinity ? cpu++ : -1));
elinks_start += elinks_per_worker;
......@@ -411,13 +422,13 @@ int main(int ac, char *av[])
if (l1a_elink_id) {
std::string uuid = felixTable.addFelix(
"tcp://" + host + ":" + std::to_string(port_number),
false, true, netio_pages_number, netio_page_size);
false, true, netio_pages_number, ttc_netio_page_size);
elinkTable.addElink(l1a_elink_id, uuid);
workers.push_back(
std::make_shared<NetioPublisher>(1, 0, barrier, host, port_number++,
netio_page_size, netio_pages_number, zero_copy,
packet_size, ecr_interval,
ttc_netio_page_size, netio_pages_number, zero_copy,
packet_size, ecr_interval, sync_interval,
0, 1, l1a_elink_id, cpu_affinity ? cpu++ : -1));
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment