Commit de92a78a authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

raft: fully activate pre-voting

parent 9769ca81
Pipeline #1545905 passed with stages
in 121 minutes and 38 seconds
......@@ -127,20 +127,27 @@ void RaftDirector::runForLeader() {
// it could have changed in-between
RaftStateSnapshotPtr snapshot = state.getSnapshot();
// advance the term by one, become a candidate.
if(!state.observed(snapshot->term+1, {})) return;
if(!state.becomeCandidate(snapshot->term+1)) return;
// prepare vote request
RaftVoteRequest votereq;
votereq.term = snapshot->term+1;
votereq.lastIndex = journal.getLogSize()-1;
if(!journal.fetch(votereq.lastIndex, votereq.lastTerm).ok()) {
qdb_critical("Unable to fetch journal entry " << votereq.lastIndex << " when running for leader");
state.dropOut(snapshot->term+1);
return;
}
ElectionOutcome prevoteOutcome = RaftElection::performPreVote(votereq, state, contactDetails);
if(prevoteOutcome == ElectionOutcome::kVetoed) {
lastHeartbeatBeforeVeto = lastHeartbeat;
qdb_info("Pre-vote round for term " << snapshot->term + 1 << " resulted in a veto. This means, the next leader of this cluster cannot be me. Stopping election attempts until I receive a heartbeat.");
}
if(prevoteOutcome != ElectionOutcome::kElected) return;
// pre-vote succeeded, advance the term by one, become a candidate.
if(!state.observed(snapshot->term+1, {})) return;
if(!state.becomeCandidate(snapshot->term+1)) return;
ElectionOutcome electionOutcome = RaftElection::perform(votereq, state, lease, contactDetails);
if(electionOutcome != ElectionOutcome::kElected) {
......
......@@ -203,10 +203,15 @@ std::future<redisReplyPtr> RaftTalker::appendEntries(
return qcl->execute(payload);
}
std::future<redisReplyPtr> RaftTalker::requestVote(const RaftVoteRequest &req) {
std::future<redisReplyPtr> RaftTalker::requestVote(const RaftVoteRequest &req, bool preVote) {
RedisRequest payload;
payload.emplace_back("RAFT_REQUEST_VOTE");
if(preVote) {
payload.emplace_back("RAFT_REQUEST_PRE_VOTE");
}
else {
payload.emplace_back("RAFT_REQUEST_VOTE");
}
payload.emplace_back(std::to_string(req.term));
payload.emplace_back(req.candidate.toString());
payload.emplace_back(std::to_string(req.lastIndex));
......
......@@ -47,7 +47,7 @@ public:
RaftTerm prevTerm, LogIndex commit,
const std::vector<RaftSerializedEntry> &entries);
std::future<redisReplyPtr> requestVote(const RaftVoteRequest &req);
std::future<redisReplyPtr> requestVote(const RaftVoteRequest &req, bool preVote = false);
std::future<redisReplyPtr> fetch(LogIndex index);
std::future<redisReplyPtr> resilveringStart(const ResilveringEventID &id);
......
......@@ -33,6 +33,41 @@
namespace quarkdb {
ElectionOutcome RaftElection::performPreVote(RaftVoteRequest votereq, RaftState &state, const RaftContactDetails &contactDetails) {
if(!votereq.candidate.empty()) {
qdb_throw("candidate member of votereq must be empty, it is filled out by this function");
}
votereq.candidate = state.getMyself();
qdb_info("Starting pre-vote round for term " << votereq.term);
std::chrono::steady_clock::time_point broadcastTimepoint = std::chrono::steady_clock::now();
std::vector<std::unique_ptr<RaftTalker>> talkers;
std::map<RaftServer, std::future<redisReplyPtr>> futures;
for(const RaftServer &node : state.getNodes()) {
if(node != votereq.candidate) {
talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request"));
futures[node] = talkers.back()->requestVote(votereq, true);
}
}
std::chrono::steady_clock::time_point deadline = broadcastTimepoint + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
qdb_info("Pre-vote requests have been sent off, will allow a window of "
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
RaftVoteRegistry registry(votereq.term, true);
for(auto it = futures.begin(); it != futures.end(); it++) {
registry.registerVote(it->first, it->second, deadline);
}
qdb_info(registry.describeOutcome());
return registry.determineOutcome();
}
ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails) {
if(!votereq.candidate.empty()) {
qdb_throw("candidate member of votereq must be empty, it is filled out by this function");
......
......@@ -48,16 +48,9 @@ public:
static bool fetchLastResponse(const qclient::redisReplyPtr &source, std::vector<RaftEntry> &entries);
};
struct ElectionSingleTally {
bool timeout;
bool error;
RaftVoteResponse resp;
};
using ElectionTally = std::map<RaftServer, ElectionSingleTally>;
class RaftElection {
public:
static ElectionOutcome performPreVote(RaftVoteRequest votereq, RaftState &state, const RaftContactDetails &contactDetails);
static ElectionOutcome perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails);
};
......
......@@ -811,7 +811,6 @@ TEST_F(Raft_Director, late_consensus) {
// verify the node tried to ascend, and failed
RaftStateSnapshotPtr snapshot = state(0)->getSnapshot();
ASSERT_GE(snapshot->term, 1);
ASSERT_TRUE(snapshot->leader.empty());
ASSERT_TRUE( (snapshot->status == RaftStatus::FOLLOWER) || (snapshot->status == RaftStatus::CANDIDATE) );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment