Commit 9769ca81 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

raft: use RaftVoteRegistry to tally votes

parent f8e9f025
Pipeline #1514910 failed with stages
in 86 minutes and 16 seconds
......@@ -27,60 +27,12 @@
#include "RaftState.hh"
#include "RaftLease.hh"
#include "RaftContactDetails.hh"
#include "raft/RaftVoteRegistry.hh"
#include "utils/ParseUtils.hh"
#include "utils/StringUtils.hh"
namespace quarkdb {
static ElectionSingleTally parseSingleTally(std::future<qclient::redisReplyPtr> &fut, std::chrono::steady_clock::time_point deadline) {
ElectionSingleTally singleTally;
if(fut.wait_until(deadline) != std::future_status::ready) {
singleTally.timeout = true;
singleTally.error = false;
return singleTally;
}
redisReplyPtr reply = fut.get();
if(reply == nullptr || !RaftParser::voteResponse(reply, singleTally.resp)) {
singleTally.timeout = false;
singleTally.error = true;
return singleTally;
}
singleTally.timeout = false;
singleTally.error = false;
return singleTally;
}
static ElectionTally makeElectionTally(RaftVoteRequest votereq, const std::vector<RaftServer> &nodes, const RaftContactDetails &contactDetails) {
qdb_info("Starting election round for term " << votereq.term);
std::vector<std::unique_ptr<RaftTalker>> talkers;
std::map<RaftServer, std::future<redisReplyPtr>> futures;
for(const RaftServer &node : nodes) {
if(node != votereq.candidate) {
talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request"));
futures[node] = talkers.back()->requestVote(votereq);
}
}
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point deadline = now + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
qdb_info("Vote requests have been sent off, will allow a window of "
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
ElectionTally tally;
for(auto it = futures.begin(); it != futures.end(); it++) {
tally[it->first] = parseSingleTally(it->second, deadline);
}
qdb_info("No longer accepting replies to vote requests, time to make a tally.");
return tally;
}
ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails) {
if(!votereq.candidate.empty()) {
qdb_throw("candidate member of votereq must be empty, it is filled out by this function");
......@@ -104,63 +56,45 @@ ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state,
return ElectionOutcome::kNotElected;
}
qdb_info("Starting election round for term " << votereq.term);
std::chrono::steady_clock::time_point broadcastTimepoint = std::chrono::steady_clock::now();
ElectionTally tally = makeElectionTally(votereq, state.getNodes(), contactDetails);
std::vector<std::unique_ptr<RaftTalker>> talkers;
std::map<RaftServer, std::future<redisReplyPtr>> futures;
size_t granted = 0;
size_t refused = 0;
size_t veto = 0;
size_t replies = 0;
for(const RaftServer &node : state.getNodes()) {
if(node != votereq.candidate) {
talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request"));
futures[node] = talkers.back()->requestVote(votereq);
}
}
for(auto it = tally.begin(); it != tally.end(); it++) {
ElectionSingleTally singleTally = it->second;
std::chrono::steady_clock::time_point deadline = broadcastTimepoint + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
if(singleTally.timeout || singleTally.error) {
continue;
}
qdb_info("Vote requests have been sent off, will allow a window of "
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
if(singleTally.resp.vote == RaftVote::GRANTED) {
lease.getHandler(it->first).heartbeat(broadcastTimepoint);
granted++;
}
else if(singleTally.resp.vote == RaftVote::REFUSED) {
refused++;
}
else if(singleTally.resp.vote == RaftVote::VETO) {
veto++;
}
RaftVoteRegistry registry(votereq.term, false);
state.observed(singleTally.resp.term, {});
for(auto it = futures.begin(); it != futures.end(); it++) {
registry.registerVote(it->first, it->second, deadline);
}
replies = granted + refused + veto;
registry.observeTermsAndLeases(state, lease, broadcastTimepoint);
std::string description = SSTR("Contacted " << tally.size() << " nodes, received "
<< replies << " replies with a tally of " << granted << " positive votes, " << refused << " refused votes, and " << veto << " vetoes.");
ElectionOutcome outcome = registry.determineOutcome();
qdb_info(registry.describeOutcome());
if(granted+1 >= (state.getNodes().size() / 2)+1 ) {
if(veto > 0) {
qdb_critical("Election round unsuccessful for term " << votereq.term << " because of vetoes, even though I received a quorum of positive votes. (!!!) " << description);
return ElectionOutcome::kVetoed;
}
qdb_event("Election round successful for term " << votereq.term << ". " << description);
if(outcome == ElectionOutcome::kElected) {
if(state.ascend(votereq.term)) {
return ElectionOutcome::kElected;
return outcome;
}
// Some strange race condition occured, term must have progressed.
return ElectionOutcome::kNotElected;
// Race condition, term must have progressed.
outcome = ElectionOutcome::kNotElected;
}
else {
qdb_event("Election round unsuccessful for term " << votereq.term << ", did not receive a quorum of votes. " << description);
if(veto > 0) {
return ElectionOutcome::kVetoed;
}
return ElectionOutcome::kNotElected;
}
return outcome;
}
bool RaftParser::appendEntries(RedisRequest &&source, RaftAppendEntriesRequest &dest) {
......
......@@ -23,6 +23,8 @@
#include "raft/RaftVoteRegistry.hh"
#include "raft/RaftUtils.hh"
#include "raft/RaftState.hh"
#include "raft/RaftLease.hh"
namespace quarkdb {
......@@ -226,4 +228,26 @@ std::string RaftVoteRegistry::describeOutcome() const {
return ss.str();
}
//------------------------------------------------------------------------------
// Observe terms and leases
//------------------------------------------------------------------------------
void RaftVoteRegistry::observeTermsAndLeases(RaftState &state, RaftLease &lease,
std::chrono::steady_clock::time_point broadcastTimepoint) {
qdb_assert(!mPreVote);
for(auto it = mContents.begin(); it != mContents.end(); it++) {
const SingleVote& sv = it->second;
if(sv.netError || sv.parseError) {
continue;
}
state.observed(sv.resp.term, {});
if(sv.resp.vote == RaftVote::GRANTED) {
lease.getHandler(it->first).heartbeat(broadcastTimepoint);
}
}
}
}
\ No newline at end of file
......@@ -30,6 +30,8 @@
namespace quarkdb {
class RaftState; class RaftLease;
//------------------------------------------------------------------------------
// Helper class for counting votes received during an election
//------------------------------------------------------------------------------
......@@ -95,6 +97,13 @@ public:
//----------------------------------------------------------------------------
std::string describeOutcome() const;
//----------------------------------------------------------------------------
// Observe terms and leases
//----------------------------------------------------------------------------
void observeTermsAndLeases(RaftState &state, RaftLease &lease,
std::chrono::steady_clock::time_point broadcastTimepoint);
private:
RaftTerm mTerm;
bool mPreVote;
......
......@@ -1078,6 +1078,13 @@ TEST(RaftVoteRequest, Describe) {
ASSERT_EQ("pre-vote request [candidate=localhost:1234, term=777, lastIndex=999, lastTerm=555]", voteReq.describe(true));
}
TEST(RaftVoteRegistry, DoubleVote) {
RaftVoteRegistry registry(1, false);
registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED));
ASSERT_THROW(registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::REFUSED)), FatalException);
}
TEST(RaftVoteRegistry, OneForOneAgainst) {
RaftVoteRegistry registry(1, false);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment