Commit 773ed2fa authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement membership changes

parent 4f380b7f
......@@ -66,5 +66,8 @@ struct cmdMapInit {
redis_cmd_map["raft_fetch"] = {RedisCommand::RAFT_FETCH, CommandType::RAFT};
redis_cmd_map["raft_checkpoint"] = {RedisCommand::RAFT_CHECKPOINT, CommandType::RAFT};
redis_cmd_map["raft_coup_detat"] = {RedisCommand::RAFT_COUP_DETAT, CommandType::RAFT};
redis_cmd_map["raft_add_observer"] = {RedisCommand::RAFT_ADD_OBSERVER, CommandType::RAFT};
redis_cmd_map["raft_remove_member"] = {RedisCommand::RAFT_REMOVE_MEMBER, CommandType::RAFT};
redis_cmd_map["raft_promote_observer"] = {RedisCommand::RAFT_PROMOTE_OBSERVER, CommandType::RAFT};
}
} cmd_map_init;
......@@ -65,7 +65,10 @@ enum class RedisCommand {
RAFT_PANIC,
RAFT_FETCH,
RAFT_CHECKPOINT,
RAFT_COUP_DETAT
RAFT_COUP_DETAT,
RAFT_ADD_OBSERVER,
RAFT_REMOVE_MEMBER,
RAFT_PROMOTE_OBSERVER
};
enum class CommandType {
......
......@@ -34,6 +34,10 @@ LinkStatus RedisDispatcher::dispatch(Connection *conn, RedisRequest &req, LogInd
auto it = redis_cmd_map.find(req[0]);
if(it != redis_cmd_map.end()) return dispatch(conn, req, it->second.first, commit);
if(startswith(req[0], "JOURNAL_")) {
store.noop(commit);
}
return conn->err(SSTR("unknown command " << quotes(req[0])));
}
......
......@@ -69,7 +69,7 @@ void RaftDirector::applyCommits() {
void RaftDirector::main() {
raftClock.heartbeat();
while(true) {
qdb_info("Random timeout refresh: " << raftClock.refreshRandomTimeout().count() << "ms");
raftClock.refreshRandomTimeout();
RaftStateSnapshot snapshot = state.getSnapshot();
if(snapshot.status == RaftStatus::SHUTDOWN) {
......@@ -79,25 +79,37 @@ void RaftDirector::main() {
actAsFollower(snapshot);
}
else if(snapshot.status == RaftStatus::LEADER) {
qdb_info("Starting replicator");
RaftReplicator replicator(journal, state, raftClock.getTimeouts());
for(const RaftServer& srv : state.getNodes()) {
if(srv != state.getMyself()) {
replicator.launch(srv, snapshot);
}
}
while(snapshot.term == state.getCurrentTerm() && state.getSnapshot().status == RaftStatus::LEADER) {
state.wait(raftClock.getTimeouts().getHeartbeatInterval());
}
actAsLeader(snapshot);
}
else if(snapshot.status == RaftStatus::CANDIDATE) {
else {
qdb_throw("should never happen");
}
}
}
void RaftDirector::actAsLeader(RaftStateSnapshot &snapshot) {
RaftMembership membership = journal.getMembership();
qdb_info("Starting replicator for membership epoch " << membership.epoch);
RaftReplicator replicator(journal, state, raftClock.getTimeouts());
for(const RaftServer& srv : membership.nodes) {
if(srv != state.getMyself()) {
replicator.launch(srv, snapshot);
}
}
for(const RaftServer& srv : membership.observers) {
if(srv == state.getMyself()) qdb_throw("found myself in the list of observers, even though I'm leader: " << serializeNodes(membership.observers));
replicator.launch(srv, snapshot);
}
while(membership.epoch == journal.getEpoch() &&
snapshot.term == state.getCurrentTerm() &&
state.getSnapshot().status == RaftStatus::LEADER) {
state.wait(raftClock.getTimeouts().getHeartbeatInterval());
}
}
void RaftDirector::runForLeader() {
// don't reuse the snapshot from the main loop,
// it could have changed in-between
......@@ -130,9 +142,12 @@ void RaftDirector::actAsFollower(RaftStateSnapshot &snapshot) {
state.wait(randomTimeout);
if(raftClock.timeout()) {
qdb_event(state.getMyself().toString() << ": TIMEOUT after " << randomTimeout.count() << "ms, I am not receiving heartbeats. Attempting to start election.");
runForLeader();
return;
if(contains(journal.getMembership().nodes, state.getMyself())) {
qdb_event(state.getMyself().toString() << ": TIMEOUT after " << randomTimeout.count() << "ms, I am not receiving heartbeats. Attempting to start election.");
runForLeader();
return;
}
qdb_warn("I am not receiving heartbeats - not running for leader since in membership epoch " << journal.getEpoch() << " I am not a full node. Will keep on waiting.");
}
}
}
......@@ -41,6 +41,7 @@ public:
private:
void main();
void actAsFollower(RaftStateSnapshot &snapshot);
void actAsLeader(RaftStateSnapshot &snapshot);
void runForLeader();
void applyCommits();
void trimJournal();
......
......@@ -121,6 +121,39 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req, LogInde
raftClock.triggerTimeout();
return conn->status("vive la revolution");
}
case RedisCommand::RAFT_ADD_OBSERVER:
case RedisCommand::RAFT_REMOVE_MEMBER:
case RedisCommand::RAFT_PROMOTE_OBSERVER: {
if(req.size() != 2) return conn->errArgs(req[0]);
RaftServer srv;
if(!parseServer(req[1], srv)) {
return conn->err(SSTR("cannot parse server: " << req[1]));
}
RaftStateSnapshot snapshot = state.getSnapshot();
if(snapshot.status != RaftStatus::LEADER) return conn->err("not a leader");
if(srv == state.getMyself()) conn->err(SSTR("cannot perform membership changes on current leader"));
std::string err;
bool rc;
if(cmd == RedisCommand::RAFT_ADD_OBSERVER) {
rc = journal.addObserver(snapshot.term, srv, err);
}
else if(cmd == RedisCommand::RAFT_REMOVE_MEMBER) {
rc = journal.removeMember(snapshot.term, srv, err);
}
else if(cmd == RedisCommand::RAFT_PROMOTE_OBSERVER) {
rc = journal.promoteObserver(snapshot.term, srv, err);
}
else {
qdb_throw("should never happen");
}
if(!rc) return conn->err(err);
return conn->ok();
}
default: {
return this->service(conn, req, cmd, it->second.second);
}
......@@ -278,7 +311,31 @@ RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest
RaftVoteResponse RaftDispatcher::requestVote(RaftVoteRequest &req) {
std::lock_guard<std::mutex> lock(raftCommand);
if(req.candidate == state.getMyself()) {
qdb_throw("received request vote from myself");
qdb_throw("received request vote from myself: " << state.getMyself().toString());
}
//----------------------------------------------------------------------------
// Defend against disruptive servers.
// A node that has been removed from the cluster will often not know that,
// and will repeatedly start elections trying to depose the current leader,
// effectively making the cluster unavailable.
//
// If this node is not part of the cluster (that I know of) and I am already
// in contact with the leader, completely ignore its vote request, and don't
// take its term into consideration.
//
// If I don't have a leader, the situation is different though. Maybe this
// node was added later, and I just don't know about it yet. Process the
// request normally, since there's no leader to depose of, anyway.
//----------------------------------------------------------------------------
if(!contains(state.getNodes(), req.candidate)) {
RaftStateSnapshot snapshot = state.getSnapshot();
if(!snapshot.leader.empty()) {
qdb_critical("Non-voting " << req.candidate.toString() << " attempted to disrupt the cluster by starting an election for term " << req.term << ". Ignoring its request.");
return {snapshot.term, false};
}
qdb_warn("Non-voting " << req.candidate.toString() << " is requesting a vote, even though it is not a voting member of the cluster as far I know. Will still process its request, since I have no leader.");
}
state.observed(req.term, {});
......
......@@ -83,7 +83,7 @@ struct RaftMembers {
return false;
}
bool promoteObserver(const RaftServer &observer, std::string err) {
bool promoteObserver(const RaftServer &observer, std::string &err) {
if(erase_element(observers, observer)) {
nodes.push_back(observer);
return true;
......
......@@ -97,10 +97,7 @@ void RaftReplicator::tracker(const RaftServer &target, const RaftStateSnapshot &
if(nextIndex <= 0) qdb_throw("nextIndex has invalid value: " << nextIndex);
if(nextIndex <= journal.getLogStart()) nextIndex = journal.getLogSize();
// TODO: check if configuration epoch has changed
RaftTerm prevTerm;
if(!journal.fetch(nextIndex-1, prevTerm).ok()) {
qdb_critical("unable to fetch log entry " << nextIndex-1 << " when tracking " << target.toString() << ". My log start: " << journal.getLogStart());
state.wait(timeouts.getHeartbeatInterval());
......
......@@ -37,6 +37,7 @@ using namespace quarkdb;
#define ASSERT_REPLY(reply, val) { assert_reply(reply, val); if(::testing::Test::HasFatalFailure()) { FAIL(); return; } }
class Raft_e2e : public TestCluster3Nodes {};
class Raft_e2e5 : public TestCluster5Nodes {};
void assert_reply(const redisReplyPtr &reply, int integer) {
ASSERT_NE(reply, nullptr);
......@@ -245,3 +246,82 @@ TEST_F(Raft_e2e, replication_with_trimmed_journal) {
ASSERT_EQ(journal(2)->getLogSize(), journal(leaderID)->getLogSize());
ASSERT_EQ(journal(2)->getLogSize(), journal(firstSlaveID)->getLogSize());
}
TEST_F(Raft_e2e, membership_updates) {
prepare(0); prepare(1); prepare(2);
spinup(0); spinup(1); spinup(2);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
int leaderID = getServerID(state(0)->getSnapshot().leader);
ASSERT_REPLY(tunnel(leaderID)->exec("set", "pi", "3.141516"), "OK");
// throw a node out of the cluster
int victim = (leaderID+1) % 3;
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot().leader, myself(leaderID));
// add it back as an observer, verify consensus
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(victim).toString()), "OK");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_EQ(state(victim)->getSnapshot().status, RaftStatus::FOLLOWER);
ASSERT_EQ(state(0)->getSnapshot().leader, state(1)->getSnapshot().leader);
ASSERT_EQ(state(1)->getSnapshot().leader, state(2)->getSnapshot().leader);
ASSERT_EQ(journal(0)->getLogSize(), journal(1)->getLogSize());
ASSERT_EQ(journal(1)->getLogSize(), journal(2)->getLogSize());
// cannot be a leader, it's an observer
ASSERT_NE(state(0)->getSnapshot().leader, myself(victim));
// add back as a full voting member
leaderID = getServerID(state(0)->getSnapshot().leader);
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(victim).toString()), "OK");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_EQ(state(0)->getSnapshot().leader, state(1)->getSnapshot().leader);
ASSERT_EQ(state(1)->getSnapshot().leader, state(2)->getSnapshot().leader);
}
TEST_F(Raft_e2e5, membership_updates_with_disruptions) {
// let's get this party started
prepare(0); prepare(1); prepare(2); prepare(3); prepare(4);
spinup(0); spinup(1); spinup(2); spinup(3);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// verify consensus
for(size_t i = 1; i < 3; i++) {
ASSERT_EQ(state(i)->getSnapshot().leader, state(i-1)->getSnapshot().leader);
}
// throw node #4 out of the cluster
int leaderID = getServerID(state(0)->getSnapshot().leader);
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(4).toString()), "OK");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// .. and now spinup node #4 :> Ensure it doesn't disrupt the current leader
spinup(4);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_EQ(leaderID, getServerID(state(0)->getSnapshot().leader));
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot().leader, myself(leaderID));
// remove one more node
int victim = (leaderID+1) % 5;
if(victim == 4) victim = 2;
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot().leader, myself(leaderID));
// issue a bunch of writes and reads
ASSERT_REPLY(tunnel(leaderID)->exec("set", "123", "abc"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("get", "123"), "abc");
}
......@@ -160,6 +160,17 @@ public:
}) { };
};
class TestCluster5Nodes : public TestCluster {
public:
TestCluster5Nodes() : TestCluster("a9b9e979-5428-42e9-8a52-f675c39fdf80", {
GlobalEnv::server(0),
GlobalEnv::server(1),
GlobalEnv::server(2),
GlobalEnv::server(3),
GlobalEnv::server(4)
}) { };
};
class SocketListener {
private:
int s;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment