Commit c5d6a409 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Test that Communicator works even when cluster is unstable

parent 2a88b465
Pipeline #1467689 failed with stages
in 58 minutes and 42 seconds
Subproject commit 503a8fbdbb0cc198ebe00b6f44581ef3392d08fc
Subproject commit d07e09a2cbcadf9b6de29d1c5e8c7af5eb9164f6
......@@ -29,6 +29,7 @@
#include "raft/RaftReplicator.hh"
#include "raft/RaftConfig.hh"
#include "raft/RaftTrimmer.hh"
#include "utils/ParseUtils.hh"
#include "storage/ConsistencyScanner.hh"
#include "Configuration.hh"
#include "QuarkDBNode.hh"
......@@ -36,11 +37,16 @@
#include "RedisParser.hh"
#include <gtest/gtest.h>
#include <qclient/QClient.hh>
#include <qclient/shared/Communicator.hh>
#include <qclient/shared/CommunicatorListener.hh>
#include <qclient/pubsub/Subscriber.hh>
#include <qclient/pubsub/Message.hh>
#include "utils/AssistedThread.hh"
#include "../test-reply-macros.hh"
using namespace quarkdb;
class Replication : public TestCluster3NodesFixture {};
class CommunicatorTest : public TestCluster3NodesFixture {};
class Membership : public TestCluster5NodesFixture {};
class SingleNodeInitially : public TestCluster10Nodes1InitialFixture {};
......@@ -296,6 +302,45 @@ TEST_F(Replication, linearizability_during_transition) {
ASSERT_TRUE(crossCheckJournals(node1, node2));
}
void consumeListener(CommunicatorRequest &&req) {
int64_t round = 0;
ParseUtils::parseInt64(req.getContents().substr(6), round);
req.sendReply(round, SSTR("reply-" << round));
}
TEST_F(CommunicatorTest, ChaosTest) {
// start the cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
qclient::Subscriber subscriber1(members(), reasonableSubscriptionOptions(true));
qclient::Subscriber subscriber2(members(), reasonableSubscriptionOptions(true));
qclient::Communicator communicator(&subscriber1, "comm-channel", nullptr, std::chrono::seconds(1),
std::chrono::minutes(5));
qclient::CommunicatorListener communicatorListener(&subscriber2, "comm-channel");
using namespace std::placeholders;
communicatorListener.attach(std::bind(&consumeListener, _1));
ClusterDestabilizer destabilizer(this);
std::vector<std::future<CommunicatorReply>> futReplies;
for(size_t round = 0; round < 1000; round++) {
futReplies.emplace_back(communicator.issue(SSTR("round-" << round)));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
for(size_t i = 0; i < futReplies.size(); i++) {
ASSERT_EQ(futReplies[i].wait_for(std::chrono::seconds(30)), std::future_status::ready) << i;
CommunicatorReply reply = futReplies[i].get();
ASSERT_EQ(reply.status, (int) i);
ASSERT_EQ(reply.contents, SSTR("reply-" << i));
}
}
TEST_F(Replication, several_transitions) {
Connection::setPhantomBatchLimit(1);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment