Skip to content
Snippets Groups Projects
Commit 4f380b7f authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement raft coup d'etat

parent 251031cd
Branches
Tags
No related merge requests found
......@@ -65,5 +65,6 @@ struct cmdMapInit {
redis_cmd_map["raft_panic"] = {RedisCommand::RAFT_PANIC, CommandType::RAFT};
redis_cmd_map["raft_fetch"] = {RedisCommand::RAFT_FETCH, CommandType::RAFT};
redis_cmd_map["raft_checkpoint"] = {RedisCommand::RAFT_CHECKPOINT, CommandType::RAFT};
redis_cmd_map["raft_coup_detat"] = {RedisCommand::RAFT_COUP_DETAT, CommandType::RAFT};
}
} cmd_map_init;
......@@ -64,7 +64,8 @@ enum class RedisCommand {
RAFT_REQUEST_VOTE,
RAFT_PANIC,
RAFT_FETCH,
RAFT_CHECKPOINT
RAFT_CHECKPOINT,
RAFT_COUP_DETAT
};
enum class CommandType {
......
......@@ -61,6 +61,11 @@ LinkStatus Connection::fromStatus(const rocksdb::Status &status) {
return Response::fromStatus(link, status);
}
LinkStatus Connection::status(const std::string &msg) {
if(!link) return 1;
return Response::status(link, msg);
}
LinkStatus Connection::ok() {
if(!link) return 1;
return Response::ok(link);
......
......@@ -53,6 +53,7 @@ public:
LinkStatus pong();
LinkStatus string(const std::string &str);
LinkStatus fromStatus(const rocksdb::Status &status);
LinkStatus status(const std::string &msg);
LinkStatus ok();
LinkStatus null();
LinkStatus integer(int64_t number);
......
......@@ -42,6 +42,10 @@ LinkStatus Response::string(Link *link, const std::string &str) {
return link->Send(SSTR("$" << str.length() << "\r\n" << str << "\r\n"));
}
LinkStatus Response::status(Link *link, const std::string &str) {
return link->Send(SSTR("+" << str << "\r\n"));
}
LinkStatus Response::ok(Link *link) {
return link->Send("+OK\r\n");
}
......
......@@ -38,6 +38,7 @@ public:
static LinkStatus pong(Link *link);
static LinkStatus string(Link *link, const std::string &str);
static LinkStatus fromStatus(Link *link, const rocksdb::Status &status);
static LinkStatus status(Link *link, const std::string &str);
static LinkStatus ok(Link *link);
static LinkStatus null(Link *link);
static LinkStatus integer(Link *link, int64_t number);
......
......@@ -120,6 +120,14 @@ bool erase_element(std::vector<T> &v, const T& element) {
return false;
}
template<class T>
bool all_identical(const std::vector<T> &v) {
for(size_t i = 1; i < v.size(); i++) {
if( !(v[i] == v[i-1]) ) return false;
}
return true;
}
template<class T>
class ScopedAdder {
public:
......
......@@ -106,6 +106,21 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req, LogInde
return conn->ok();
}
case RedisCommand::RAFT_COUP_DETAT: {
RaftStateSnapshot snapshot = state.getSnapshot();
if(snapshot.leader.empty()) {
return conn->err("I have no leader to depose of");
}
if(snapshot.leader == state.getMyself()) {
return conn->err("I am the leader! I can't revolt against myself, you know.");
}
qdb_event("Received request to attempt a coup d'etat against the current leader.");
raftClock.triggerTimeout();
return conn->status("vive la revolution");
}
default: {
return this->service(conn, req, cmd, it->second.second);
}
......
......@@ -71,8 +71,19 @@ void RaftClock::heartbeat() {
lastHeartbeat = std::chrono::steady_clock::now();
}
void RaftClock::triggerTimeout() {
std::lock_guard<std::mutex> lock(lastHeartbeatMutex);
artificialTimeout = true;
}
bool RaftClock::timeout() {
std::lock_guard<std::mutex> lock(lastHeartbeatMutex);
if(artificialTimeout) {
qdb_event("Triggering an artificial timeout.");
artificialTimeout = false;
return true;
}
return std::chrono::steady_clock::now() - lastHeartbeat > randomTimeout;
}
......
......@@ -63,12 +63,14 @@ public:
RaftTimeouts getTimeouts() { return timeouts; }
milliseconds getRandomTimeout();
void triggerTimeout();
private:
std::mutex lastHeartbeatMutex;
std::chrono::steady_clock::time_point lastHeartbeat;
milliseconds randomTimeout;
RaftTimeouts timeouts;
bool artificialTimeout = false;
};
extern RaftTimeouts defaultTimeouts;
......
......@@ -62,6 +62,25 @@ void assert_reply(std::future<redisReplyPtr> &&fut, T&& check) {
assert_reply(fut.get(), check);
}
TEST_F(Raft_e2e, coup) {
prepare(0); prepare(1); prepare(2);
spinup(0); spinup(1); spinup(2);
// wait for consensus
std::this_thread::sleep_for(std::chrono::milliseconds(300));
int leaderID = getLeaderID();
ASSERT_GE(leaderID, 0);
ASSERT_LE(leaderID, 2);
int instigator = (leaderID+1)%3;
ASSERT_REPLY(tunnel(instigator)->exec("RAFT_COUP_DETAT"), "vive la revolution");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_EQ(instigator, getLeaderID());
ASSERT_TRUE(all_identical(retrieveLeaders()));
}
TEST_F(Raft_e2e, simultaneous_clients) {
prepare(0); prepare(1); prepare(2);
spinup(0); spinup(1); spinup(2);
......
......@@ -49,4 +49,7 @@ TEST(Response, T1) {
ASSERT_EQ(reader.consume(5, buffer), 5);
ASSERT_EQ(buffer, "$-1\r\n");
ASSERT_EQ(Response::status(&link, "test"), 7);
ASSERT_EQ(reader.consume(7, buffer), 7);
ASSERT_EQ(buffer, "+test\r\n");
}
......@@ -186,6 +186,19 @@ int TestCluster::getServerID(const RaftServer &srv) {
return -1;
}
std::vector<RaftServer> TestCluster::retrieveLeaders() {
std::vector<RaftServer> ret;
for(size_t i = 0; i < initialNodes.size(); i++) {
if(testnodes.count(i) > 0) {
ret.push_back(state(i)->getSnapshot().leader);
}
}
return ret;
}
int TestCluster::getLeaderID() {
return getServerID(state(0)->getSnapshot().leader);
}
TestNode::TestNode(RaftServer me, RaftClusterID clust, const std::vector<RaftServer> &nd)
: myselfSrv(me), clusterID(clust), initialNodes(nd) {
......
......@@ -138,6 +138,8 @@ public:
RaftClusterID clusterID();
int getServerID(const RaftServer &srv);
std::vector<RaftServer> retrieveLeaders();
int getLeaderID();
private:
std::string rocksdbPath(int id = 0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment