Commit 99212371 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

raft: refactor election function, separate response parsing

parent a60e13bc
Pipeline #1509772 failed with stages
in 78 minutes and 58 seconds
......@@ -230,7 +230,6 @@ struct RaftVoteResponse {
return ret;
}
};
inline size_t calculateQuorumSize(size_t members) {
......
......@@ -32,6 +32,55 @@
namespace quarkdb {
static ElectionSingleTally parseSingleTally(std::future<qclient::redisReplyPtr> &fut, std::chrono::steady_clock::time_point deadline) {
ElectionSingleTally singleTally;
if(fut.wait_until(deadline) != std::future_status::ready) {
singleTally.timeout = true;
singleTally.error = false;
return singleTally;
}
redisReplyPtr reply = fut.get();
if(reply == nullptr || !RaftParser::voteResponse(reply, singleTally.resp)) {
singleTally.timeout = false;
singleTally.error = true;
return singleTally;
}
singleTally.timeout = false;
singleTally.error = false;
return singleTally;
}
static ElectionTally makeElectionTally(RaftVoteRequest votereq, const std::vector<RaftServer> &nodes, const RaftContactDetails &contactDetails) {
qdb_info("Starting election round for term " << votereq.term);
std::vector<std::unique_ptr<RaftTalker>> talkers;
std::map<RaftServer, std::future<redisReplyPtr>> futures;
for(const RaftServer &node : nodes) {
if(node != votereq.candidate) {
talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request"));
futures[node] = talkers.back()->requestVote(votereq);
}
}
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point deadline = now + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
qdb_info("Vote requests have been sent off, will allow a window of "
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
ElectionTally tally;
for(auto it = futures.begin(); it != futures.end(); it++) {
tally[it->first] = parseSingleTally(it->second, deadline);
}
qdb_info("No longer accepting replies to vote requests, time to make a tally.");
return tally;
}
ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails) {
if(!votereq.candidate.empty()) {
qdb_throw("candidate member of votereq must be empty, it is filled out by this function");
......@@ -55,63 +104,40 @@ ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state,
return ElectionOutcome::kNotElected;
}
qdb_info(state.getMyself().toString() << ": Starting election round for term " << votereq.term);
std::vector<std::unique_ptr<RaftTalker>> talkers;
std::chrono::steady_clock::time_point broadcastTimepoint = std::chrono::steady_clock::now();
std::vector<std::future<redisReplyPtr>> futures;
for(const RaftServer &node : state.getNodes()) {
if(node != state.getMyself()) {
talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request"));
futures.push_back(talkers.back()->requestVote(votereq));
}
}
std::vector<redisReplyPtr> replies;
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point deadline = now + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
qdb_info(state.getMyself().toString() << ": Vote requests have been sent off, will allow a window of "
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
for(size_t i = 0; i < futures.size(); i++) {
if(futures[i].wait_until(deadline) == std::future_status::ready) {
redisReplyPtr reply = futures[i].get();
if(reply != nullptr) replies.push_back(reply);
}
}
qdb_info("No longer accepting replies to vote requests, time to make a tally.");
ElectionTally tally = makeElectionTally(votereq, state.getNodes(), contactDetails);
size_t granted = 0;
size_t refused = 0;
size_t veto = 0;
size_t replies = 0;
for(size_t i = 0; i < replies.size(); i++) {
RaftVoteResponse resp;
if(!RaftParser::voteResponse(replies[i], resp)) {
qdb_critical("unable to parse a vote response, ignoring");
for(auto it = tally.begin(); it != tally.end(); it++) {
ElectionSingleTally singleTally = it->second;
if(singleTally.timeout || singleTally.error) {
continue;
}
if(singleTally.resp.vote == RaftVote::GRANTED) {
lease.getHandler(it->first).heartbeat(broadcastTimepoint);
granted++;
}
else {
if(resp.vote == RaftVote::GRANTED) {
lease.getHandler(talkers[i]->getServer()).heartbeat(broadcastTimepoint);
granted++;
}
else if(resp.vote == RaftVote::REFUSED) {
refused++;
}
else if(resp.vote == RaftVote::VETO) {
veto++;
}
state.observed(resp.term, {});
else if(singleTally.resp.vote == RaftVote::REFUSED) {
refused++;
}
else if(singleTally.resp.vote == RaftVote::VETO) {
veto++;
}
state.observed(singleTally.resp.term, {});
}
talkers.clear();
replies = granted + refused + veto;
std::string description = SSTR("Contacted " << futures.size() << " nodes, received "
<< replies.size() << " replies with a tally of " << granted << " positive votes, " << refused << " refused votes, and " << veto << " vetoes.");
std::string description = SSTR("Contacted " << tally.size() << " nodes, received "
<< replies << " replies with a tally of " << granted << " positive votes, " << refused << " refused votes, and " << veto << " vetoes.");
if(granted+1 >= (state.getNodes().size() / 2)+1 ) {
if(veto > 0) {
......
......@@ -55,6 +55,14 @@ enum class ElectionOutcome {
kVetoed
};
struct ElectionSingleTally {
bool timeout;
bool error;
RaftVoteResponse resp;
};
using ElectionTally = std::map<RaftServer, ElectionSingleTally>;
class RaftElection {
public:
static ElectionOutcome perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment