Commit bd36be04 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement first end-to-end Communicator test

parent b129f21f
Pipeline #1457022 failed with stages
in 80 minutes and 48 seconds
Subproject commit 7641c061b271e5c8f776ebf0a51a3d90ead2972d Subproject commit 0df12118ad52d24d2d9d652a03ba19eb6300d7e9
...@@ -49,6 +49,8 @@ ...@@ -49,6 +49,8 @@
#include "qclient/shared/SharedHash.hh" #include "qclient/shared/SharedHash.hh"
#include "qclient/shared/SharedManager.hh" #include "qclient/shared/SharedManager.hh"
#include "qclient/shared/TransientSharedHash.hh" #include "qclient/shared/TransientSharedHash.hh"
#include "qclient/shared/Communicator.hh"
#include "qclient/shared/CommunicatorListener.hh"
using namespace quarkdb; using namespace quarkdb;
#define ASSERT_OK(msg) ASSERT_TRUE(msg.ok()) #define ASSERT_OK(msg) ASSERT_TRUE(msg.ok())
...@@ -2453,6 +2455,35 @@ TEST_F(Raft_e2e, SharedHash) { ...@@ -2453,6 +2455,35 @@ TEST_F(Raft_e2e, SharedHash) {
ASSERT_EQ(tmp, "v1"); ASSERT_EQ(tmp, "v1");
} }
TEST_F(Raft_e2e, Communicator) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
qclient::Subscriber subscriber1(members(), reasonableSubscriptionOptions(true));
qclient::Subscriber subscriber2(members(), reasonableSubscriptionOptions(true));
Communicator communicator(&subscriber1, "comm-channel", nullptr, std::chrono::milliseconds(1),
std::chrono::seconds(60));
CommunicatorListener communicatorListener(&subscriber2, "comm-channel");
std::string reqID;
std::future<CommunicatorReply> fut = communicator.issue("i-like-trains", reqID);
RETRY_ASSERT_EQ(communicatorListener.size(), 1u);
qclient::CommunicatorRequest req = communicatorListener.front();
ASSERT_EQ(req.getID(), reqID);
ASSERT_EQ(req.getContents(), "i-like-trains");
req.sendReply(888, "aaaaa");
ASSERT_EQ(fut.wait_for(std::chrono::seconds(3)), std::future_status::ready);
CommunicatorReply reply = fut.get();
ASSERT_EQ(reply.status, 888);
ASSERT_EQ(reply.contents, "aaaaa");
}
TEST_F(Raft_e2e, NoAuth) { TEST_F(Raft_e2e, NoAuth) {
spinup(0); spinup(1); spinup(2); spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
......
...@@ -273,8 +273,9 @@ int TestCluster::getLeaderID() { ...@@ -273,8 +273,9 @@ int TestCluster::getLeaderID() {
return getServerID(state(0)->getSnapshot()->leader); return getServerID(state(0)->getSnapshot()->leader);
} }
qclient::SubscriptionOptions TestCluster::reasonableSubscriptionOptions() { qclient::SubscriptionOptions TestCluster::reasonableSubscriptionOptions(bool pushtypes) {
qclient::SubscriptionOptions opts; qclient::SubscriptionOptions opts;
opts.usePushTypes = pushtypes;
opts.handshake = makeQClientHandshake(); opts.handshake = makeQClientHandshake();
return opts; return opts;
} }
......
...@@ -385,7 +385,7 @@ public: ...@@ -385,7 +385,7 @@ public:
std::vector<RaftServer> retrieveLeaders(); std::vector<RaftServer> retrieveLeaders();
int getLeaderID(); int getLeaderID();
qclient::SubscriptionOptions reasonableSubscriptionOptions(); qclient::SubscriptionOptions reasonableSubscriptionOptions(bool pushtypes = false);
private: private:
std::string rocksdbPath(int id = 0); std::string rocksdbPath(int id = 0);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment