Commit ce375eee authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Use shared pointers for raft state snapshots, prevent unnecessary copying

parent 5b6f2d33
Pipeline #259351 failed with stages
in 11 minutes and 44 seconds
......@@ -21,6 +21,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include <stdlib.h>
#include <algorithm>
#include "XrdVersion.hh"
#include "XrdQuarkDB.hh"
#include "XrdOuc/XrdOucEnv.hh"
......@@ -28,9 +31,6 @@
#include "utils/ScopedAdder.hh"
#include "QuarkDBNode.hh"
#include <stdlib.h>
#include <algorithm>
using namespace quarkdb;
......
......@@ -42,15 +42,15 @@ void RaftDirector::main() {
raftClock.heartbeat();
while(true) {
raftClock.refreshRandomTimeout();
RaftStateSnapshot snapshot = state.getSnapshot();
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot.status == RaftStatus::SHUTDOWN) {
if(snapshot->status == RaftStatus::SHUTDOWN) {
return;
}
else if(snapshot.status == RaftStatus::FOLLOWER) {
else if(snapshot->status == RaftStatus::FOLLOWER) {
actAsFollower(snapshot);
}
else if(snapshot.status == RaftStatus::LEADER) {
else if(snapshot->status == RaftStatus::LEADER) {
actAsLeader(snapshot);
raftClock.heartbeat();
}
......@@ -60,17 +60,17 @@ void RaftDirector::main() {
}
}
void RaftDirector::actAsLeader(RaftStateSnapshot &snapshot) {
if(snapshot.leader != state.getMyself()) qdb_throw("attempted to act as leader, even though snapshot shows a different one");
void RaftDirector::actAsLeader(RaftStateSnapshotPtr &snapshot) {
if(snapshot->leader != state.getMyself()) qdb_throw("attempted to act as leader, even though snapshot shows a different one");
replicator.activate(snapshot);
while(snapshot.term == state.getCurrentTerm() &&
state.getSnapshot().status == RaftStatus::LEADER) {
while(snapshot->term == state.getCurrentTerm() &&
state.getSnapshot()->status == RaftStatus::LEADER) {
std::chrono::steady_clock::time_point deadline = lease.getDeadline();
if(deadline < std::chrono::steady_clock::now()) {
qdb_event("My leader lease has expired, I no longer control a quorum, stepping down.");
state.observed(snapshot.term+1, {});
state.observed(snapshot->term+1, {});
writeTracker.flushQueues(Formatter::err("unavailable"));
break;
}
......@@ -83,32 +83,32 @@ void RaftDirector::actAsLeader(RaftStateSnapshot &snapshot) {
void RaftDirector::runForLeader() {
// don't reuse the snapshot from the main loop,
// it could have changed in-between
RaftStateSnapshot snapshot = state.getSnapshot();
RaftStateSnapshotPtr snapshot = state.getSnapshot();
// advance the term by one, become a candidate.
if(!state.observed(snapshot.term+1, {})) return;
if(!state.becomeCandidate(snapshot.term+1)) return;
if(!state.observed(snapshot->term+1, {})) return;
if(!state.becomeCandidate(snapshot->term+1)) return;
// prepare vote request
RaftVoteRequest votereq;
votereq.term = snapshot.term+1;
votereq.term = snapshot->term+1;
votereq.lastIndex = journal.getLogSize()-1;
if(!journal.fetch(votereq.lastIndex, votereq.lastTerm).ok()) {
qdb_critical("Unable to fetch journal entry " << votereq.lastIndex << " when running for leader");
state.dropOut(snapshot.term+1);
state.dropOut(snapshot->term+1);
return;
}
if(!RaftElection::perform(votereq, state, lease, raftClock.getTimeouts())) {
state.dropOut(snapshot.term+1);
state.dropOut(snapshot->term+1);
}
}
void RaftDirector::actAsFollower(RaftStateSnapshot &snapshot) {
void RaftDirector::actAsFollower(RaftStateSnapshotPtr &snapshot) {
milliseconds randomTimeout = raftClock.getRandomTimeout();
while(true) {
RaftStateSnapshot now = state.getSnapshot();
if(snapshot.term != now.term || snapshot.status != now.status) return;
RaftStateSnapshotPtr now = state.getSnapshot();
if(snapshot->term != now->term || snapshot->status != now->status) return;
state.wait(randomTimeout);
if(raftClock.timeout()) {
......
......@@ -44,8 +44,8 @@ public:
DISALLOW_COPY_AND_ASSIGN(RaftDirector);
private:
void main();
void actAsFollower(RaftStateSnapshot &snapshot);
void actAsLeader(RaftStateSnapshot &snapshot);
void actAsFollower(RaftStateSnapshotPtr &snapshot);
void actAsLeader(RaftStateSnapshotPtr &snapshot);
void runForLeader();
void applyCommits();
......
......@@ -131,13 +131,13 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
return conn->ok();
}
case RedisCommand::RAFT_ATTEMPT_COUP: {
RaftStateSnapshot snapshot = state.getSnapshot();
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot.leader.empty()) {
if(snapshot->leader.empty()) {
return conn->err("I have no leader, cannot start a coup");
}
if(snapshot.leader == state.getMyself()) {
if(snapshot->leader == state.getMyself()) {
return conn->err("I am the leader! I can't revolt against myself, you know.");
}
......@@ -155,15 +155,15 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
return conn->err(SSTR("cannot parse server: " << req[1]));
}
RaftStateSnapshot snapshot = state.getSnapshot();
if(snapshot.status != RaftStatus::LEADER) return conn->err("not a leader");
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot->status != RaftStatus::LEADER) return conn->err("not a leader");
if(srv == state.getMyself()) return conn->err("cannot perform membership changes on current leader");
std::string err;
bool rc;
if(req.getCommand() == RedisCommand::RAFT_ADD_OBSERVER) {
rc = journal.addObserver(snapshot.term, srv, err);
rc = journal.addObserver(snapshot->term, srv, err);
}
else if(req.getCommand() == RedisCommand::RAFT_REMOVE_MEMBER) {
// Build a replication status object with how the full members would
......@@ -180,7 +180,7 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
if(!replicationStatus.quorumUpToDate(leaderStatus.nextIndex)) {
return conn->err("membership update blocked, new cluster would not have an up-to-date quorum");
}
rc = journal.removeMember(snapshot.term, srv, err);
rc = journal.removeMember(snapshot->term, srv, err);
}
else if(req.getCommand() == RedisCommand::RAFT_PROMOTE_OBSERVER) {
ReplicationStatus replicationStatus = replicator.getStatus();
......@@ -188,7 +188,7 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
return conn->err("membership update blocked, observer is not up-to-date");
}
rc = journal.promoteObserver(snapshot.term, srv, err);
rc = journal.promoteObserver(snapshot->term, srv, err);
}
else {
qdb_throw("should never happen");
......@@ -218,9 +218,9 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
// if not leader, redirect... except if this is a read,
// and stale reads are active!
RaftStateSnapshot snapshot = state.getSnapshot();
if(snapshot.status != RaftStatus::LEADER) {
if(snapshot.leader.empty()) {
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot->status != RaftStatus::LEADER) {
if(snapshot->leader.empty()) {
return conn->err("unavailable");
}
......@@ -230,7 +230,7 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
}
// Redirect.
return conn->moved(0, snapshot.leader);
return conn->moved(0, snapshot->leader);
}
// read request: What happens if I was just elected as leader, but my state
......@@ -242,11 +242,11 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
// Ensure the state machine is all caught-up before servicing reads, in order
// to prevent a linearizability violation.
if(req.getCommandType() == CommandType::READ) {
if(stateMachine.getLastApplied() < snapshot.leadershipMarker) {
if(stateMachine.getLastApplied() < snapshot->leadershipMarker) {
// Stall client request until state machine is caught-up, or we lose leadership
while(!stateMachine.waitUntilTargetLastApplied(snapshot.leadershipMarker, std::chrono::milliseconds(500))) {
if(snapshot.term != state.getCurrentTerm()) {
while(!stateMachine.waitUntilTargetLastApplied(snapshot->leadershipMarker, std::chrono::milliseconds(500))) {
if(snapshot->term != state.getCurrentTerm()) {
// Ouch, we're no longer a leader.. start from scratch
return this->service(conn, req);
}
......@@ -254,7 +254,7 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
// If we've made it this far, the state machine should be all caught-up
// by now. Proceed to service this request.
qdb_assert(snapshot.leadershipMarker <= stateMachine.getLastApplied());
qdb_assert(snapshot->leadershipMarker <= stateMachine.getLastApplied());
}
return conn->addPendingRequest(&redisDispatcher, std::move(req));
......@@ -271,9 +271,9 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
LogIndex index = journal.getLogSize();
if(!writeTracker.append(index, RaftEntry(snapshot.term, std::move(req)), conn->getQueue(), redisDispatcher)) {
if(!writeTracker.append(index, RaftEntry(snapshot->term, std::move(req)), conn->getQueue(), redisDispatcher)) {
qdb_critical("appending write for index = " << index <<
" and term " << snapshot.term << " failed when servicing client request");
" and term " << snapshot->term << " failed when servicing client request");
return conn->err("unknown error");
}
......@@ -281,11 +281,11 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
}
RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req) {
RaftStateSnapshot snapshot;
RaftStateSnapshotPtr snapshot;
return heartbeat(req, snapshot);
}
RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req, RaftStateSnapshot &snapshot) {
RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req, RaftStateSnapshotPtr &snapshot) {
//----------------------------------------------------------------------------
// This RPC is a custom extension to raft - coupling appendEntries to
......@@ -312,21 +312,21 @@ RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req,
snapshot = state.getSnapshot();
if(state.inShutdown()) {
return {snapshot.term, false, "in shutdown"};
return {snapshot->term, false, "in shutdown"};
}
if(req.term < snapshot.term) {
return {snapshot.term, false, "My raft term is newer"};
if(req.term < snapshot->term) {
return {snapshot->term, false, "My raft term is newer"};
}
qdb_assert(req.term == snapshot.term);
qdb_assert(req.term == snapshot->term);
if(req.leader != snapshot.leader) {
qdb_throw("Received append entries from " << req.leader.toString() << ", while I believe leader for term " << snapshot.term << " is " << snapshot.leader.toString());
if(req.leader != snapshot->leader) {
qdb_throw("Received append entries from " << req.leader.toString() << ", while I believe leader for term " << snapshot->term << " is " << snapshot->leader.toString());
}
raftClock.heartbeat();
return {snapshot.term, true, ""};
return {snapshot->term, true, ""};
}
RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest &&req) {
......@@ -337,7 +337,7 @@ RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest
// state snapshot taken inside heartbeat.
//----------------------------------------------------------------------------
RaftStateSnapshot snapshot;
RaftStateSnapshotPtr snapshot;
RaftHeartbeatResponse heartbeatResponse = heartbeat({req.term, req.leader}, snapshot);
if(!heartbeatResponse.nodeRecognizedAsLeader) {
......@@ -349,10 +349,10 @@ RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest
// requested journal modifications, if any.
//----------------------------------------------------------------------------
writeTracker.flushQueues(Formatter::moved(0, snapshot.leader));
writeTracker.flushQueues(Formatter::moved(0, snapshot->leader));
if(!journal.matchEntries(req.prevIndex, req.prevTerm)) {
return {snapshot.term, journal.getLogSize(), false, "Log entry mismatch"};
return {snapshot->term, journal.getLogSize(), false, "Log entry mismatch"};
}
//----------------------------------------------------------------------------
......@@ -383,14 +383,14 @@ RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest
for(size_t i = appendFrom; i < req.entries.size(); i++) {
if(!journal.append(req.prevIndex+1+i, req.entries[i])) {
qdb_warn("something odd happened when adding entries to the journal.. probably a race condition, but should be harmless");
return {snapshot.term, journal.getLogSize(), false, "Unknown error"};
return {snapshot->term, journal.getLogSize(), false, "Unknown error"};
}
}
}
journal.setCommitIndex(std::min(journal.getLogSize()-1, req.commitIndex));
warnIfLagging(req.commitIndex);
return {snapshot.term, journal.getLogSize(), true, ""};
return {snapshot->term, journal.getLogSize(), true, ""};
}
void RaftDispatcher::warnIfLagging(LogIndex leaderCommitIndex) {
......@@ -430,16 +430,16 @@ RaftVoteResponse RaftDispatcher::requestVote(RaftVoteRequest &req) {
//----------------------------------------------------------------------------
if(!contains(state.getNodes(), req.candidate)) {
RaftStateSnapshot snapshot = state.getSnapshot();
if(!snapshot.leader.empty()) {
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(!snapshot->leader.empty()) {
qdb_misconfig("Non-voting " << req.candidate.toString() << " attempted to disrupt the cluster by starting an election for term " << req.term << ". Ignoring its request - shut down that node!");
return {snapshot.term, RaftVote::VETO};
return {snapshot->term, RaftVote::VETO};
}
qdb_warn("Non-voting " << req.candidate.toString() << " is requesting a vote, even though it is not a voting member of the cluster as far I know. Will still process its request, since I have no leader.");
}
state.observed(req.term, {});
RaftStateSnapshot snapshot = state.getSnapshot();
RaftStateSnapshotPtr snapshot = state.getSnapshot();
//----------------------------------------------------------------------------
// If the contacting node were to be elected, would they potentially overwrite
......@@ -461,7 +461,7 @@ RaftVoteResponse RaftDispatcher::requestVote(RaftVoteRequest &req) {
if(req.lastIndex <= journal.getCommitIndex()) {
if(req.lastIndex < journal.getLogStart()) {
qdb_event("Vetoing vote request from " << req.candidate.toString() << " because its lastIndex (" << req.lastIndex << ") is before my log start (" << journal.getLogStart() << ") - way too far behind me.");
return {snapshot.term, RaftVote::VETO};
return {snapshot->term, RaftVote::VETO};
}
RaftTerm myLastIndexTerm;
......@@ -470,7 +470,7 @@ RaftVoteResponse RaftDispatcher::requestVote(RaftVoteRequest &req) {
// It could be that I just have a corrupted journal - don't prevent the
// node from ascending in this case... If I crash afterwards during
// replication, so be it.
return {snapshot.term, RaftVote::REFUSED};
return {snapshot->term, RaftVote::REFUSED};
}
// If the node were to ascend, it'll try and remove my req.lastIndex entry
......@@ -481,62 +481,62 @@ RaftVoteResponse RaftDispatcher::requestVote(RaftVoteRequest &req) {
// May the Gods be kind on our souls, and we never see this message in production.
qdb_throw("Candidate " << req.candidate.toString() << " has a log entry at " << req.lastIndex << " with term " << req.lastTerm << " while I have a COMMITTED entry with a LOWER term: " << myLastIndexTerm << ". MAJOR CORRUPTION, DB IS ON FIRE");
}
return {snapshot.term, RaftVote::VETO};
return {snapshot->term, RaftVote::VETO};
}
if(req.lastIndex+1 <= journal.getCommitIndex()) {
// If the node were to ascend, it would add a leadership marker, and try
// to remove my committed req.lastIndex+1 entry as conflicting. Veto!
qdb_event("Vetoing vote request from " << req.candidate.toString() << " because its ascension would overwrite my committed entry with index " << req.lastIndex+1 << " through the addition of a leadership marker.");
return {snapshot.term, RaftVote::VETO};
return {snapshot->term, RaftVote::VETO};
}
}
if(snapshot.term != req.term) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " because of a term mismatch: " << snapshot.term << " vs " << req.term);
return {snapshot.term, RaftVote::REFUSED};
if(snapshot->term != req.term) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " because of a term mismatch: " << snapshot->term << " vs " << req.term);
return {snapshot->term, RaftVote::REFUSED};
}
if(!snapshot.votedFor.empty() && snapshot.votedFor != req.candidate) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " since I've voted already in this term (" << snapshot.term << ") for " << snapshot.votedFor.toString());
return {snapshot.term, RaftVote::REFUSED};
if(!snapshot->votedFor.empty() && snapshot->votedFor != req.candidate) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " since I've voted already in this term (" << snapshot->term << ") for " << snapshot->votedFor.toString());
return {snapshot->term, RaftVote::REFUSED};
}
LogIndex myLastIndex = journal.getLogSize()-1;
RaftTerm myLastTerm;
if(!journal.fetch(myLastIndex, myLastTerm).ok()) {
qdb_critical("Error when reading journal entry " << myLastIndex << " when processing request vote.");
return {snapshot.term, RaftVote::REFUSED};
return {snapshot->term, RaftVote::REFUSED};
}
if(req.lastTerm < myLastTerm) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " since my log is more up-to-date, based on last term: " << myLastIndex << "," << myLastTerm << " vs " << req.lastIndex << "," << req.lastTerm);
return {snapshot.term, RaftVote::REFUSED};
return {snapshot->term, RaftVote::REFUSED};
}
if(req.lastTerm == myLastTerm && req.lastIndex < myLastIndex) {
qdb_event("Rejecting vote request from " << req.candidate.toString() << " since my log is more up-to-date, based on last index: " << myLastIndex << "," << myLastTerm << " vs " << req.lastIndex << "," << req.lastTerm);
return {snapshot.term, RaftVote::REFUSED};
return {snapshot->term, RaftVote::REFUSED};
}
// grant vote
if(!state.grantVote(req.term, req.candidate)) {
qdb_warn("RaftState rejected the vote request from " << req.candidate.toString() << " and term " << req.term << " - probably benign race condition?");
return {snapshot.term, RaftVote::REFUSED};
return {snapshot->term, RaftVote::REFUSED};
}
raftClock.heartbeat();
return {snapshot.term, RaftVote::GRANTED};
return {snapshot->term, RaftVote::GRANTED};
}
RaftInfo RaftDispatcher::info() {
std::lock_guard<std::mutex> lock(raftCommand);
RaftStateSnapshot snapshot = state.getSnapshot();
RaftStateSnapshotPtr snapshot = state.getSnapshot();
RaftMembership membership = journal.getMembership();
ReplicationStatus replicationStatus = replicator.getStatus();
return {journal.getClusterID(), state.getMyself(), snapshot.leader, membership.epoch, membership.nodes, membership.observers, snapshot.term, journal.getLogStart(),
journal.getLogSize(), snapshot.status, journal.getCommitIndex(), stateMachine.getLastApplied(), writeTracker.size(), replicationStatus };
return {journal.getClusterID(), state.getMyself(), snapshot->leader, membership.epoch, membership.nodes, membership.observers, snapshot->term, journal.getLogStart(),
journal.getLogSize(), snapshot->status, journal.getCommitIndex(), stateMachine.getLastApplied(), writeTracker.size(), replicationStatus };
}
bool RaftDispatcher::fetch(LogIndex index, RaftEntry &entry) {
......
......@@ -37,8 +37,10 @@ namespace quarkdb {
// Forward declarations
//------------------------------------------------------------------------------
class RaftJournal; class RaftState; class RaftClock; class RaftWriteTracker;
class RaftReplicator; struct RaftStateSnapshot;
class RaftReplicator;
class RaftStateSnapshot;
using RaftStateSnapshotPtr = std::shared_ptr<const RaftStateSnapshot>;
class RaftDispatcher : public Dispatcher {
public:
......@@ -55,7 +57,7 @@ public:
RaftAppendEntriesResponse appendEntries(RaftAppendEntriesRequest &&req);
RaftVoteResponse requestVote(RaftVoteRequest &req);
private:
RaftHeartbeatResponse heartbeat(const RaftHeartbeatRequest &req, RaftStateSnapshot &snapshot);
RaftHeartbeatResponse heartbeat(const RaftHeartbeatRequest &req, RaftStateSnapshotPtr &snapshot);
LinkStatus service(Connection *conn, RedisRequest &req);
//----------------------------------------------------------------------------
......
......@@ -48,7 +48,7 @@ RaftReplicator::~RaftReplicator() {
deactivate();
}
RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStateSnapshot &snapshot_, RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftTimeouts t)
RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStateSnapshotPtr &snapshot_, RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftTimeouts t)
: target(target_), snapshot(snapshot_), journal(journal_),
state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), timeouts(t),
matchIndex(commitTracker.getHandler(target)),
......@@ -57,16 +57,16 @@ RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStat
qdb_throw("attempted to run replication on myself");
}
RaftStateSnapshot current = state.getSnapshot();
if(snapshot.term > current.term) {
RaftStateSnapshotPtr current = state.getSnapshot();
if(snapshot->term > current->term) {
qdb_throw("bug, a state snapshot has a larger term than the current state");
}
if(snapshot.term < current.term) {
if(snapshot->term < current->term) {
return;
}
if(current.status != RaftStatus::LEADER && current.status != RaftStatus::SHUTDOWN) {
if(current->status != RaftStatus::LEADER && current->status != RaftStatus::SHUTDOWN) {
qdb_throw("bug, attempted to initiate replication for a term in which I'm not a leader");
}
......@@ -227,7 +227,7 @@ void RaftReplicaTracker::monitorAckReception(ThreadAssistant &assistant) {
return;
}
if(response.term != snapshot.term) {
if(response.term != snapshot->term) {
streamingUpdates = false;
return;
}
......@@ -263,7 +263,7 @@ LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNex
const int64_t payloadLimit = 512;
LogIndex nextIndex = firstNextIndex;
while(shutdown == 0 && snapshot.term == state.getCurrentTerm() && !state.inShutdown()) {
while(shutdown == 0 && snapshot->term == state.getCurrentTerm() && !state.inShutdown()) {
if(!streamingUpdates) {
// Something went wrong while streaming, return to parent to stabilize
return nextIndex;
......@@ -285,7 +285,7 @@ LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNex
}
std::chrono::steady_clock::time_point contact = std::chrono::steady_clock::now();
std::future<redisReplyPtr> fut = talker.appendEntries(snapshot.term, state.getMyself(), nextIndex-1, prevTerm, journal.getCommitIndex(), entries);
std::future<redisReplyPtr> fut = talker.appendEntries(snapshot->term, state.getMyself(), nextIndex-1, prevTerm, journal.getCommitIndex(), entries);
std::unique_lock<std::mutex> lock(inFlightMtx);
inFlight.emplace(
......@@ -328,9 +328,9 @@ ReplicaStatus RaftReplicaTracker::getStatus() {
void RaftReplicaTracker::sendHeartbeats(ThreadAssistant &assistant) {
RaftTalker talker(target, journal.getClusterID(), timeouts);
while(!assistant.terminationRequested() && shutdown == 0 && snapshot.term == state.getCurrentTerm() && !state.inShutdown()) {
while(!assistant.terminationRequested() && shutdown == 0 && snapshot->term == state.getCurrentTerm() && !state.inShutdown()) {
std::chrono::steady_clock::time_point contact = std::chrono::steady_clock::now();
std::future<redisReplyPtr> fut = talker.heartbeat(snapshot.term, state.getMyself());
std::future<redisReplyPtr> fut = talker.heartbeat(snapshot->term, state.getMyself());
RaftHeartbeatResponse resp;
if(!retrieve_heartbeat_reply(fut, resp)) {
......@@ -338,7 +338,7 @@ void RaftReplicaTracker::sendHeartbeats(ThreadAssistant &assistant) {
}
state.observed(resp.term, {});
if(snapshot.term < resp.term || !resp.nodeRecognizedAsLeader) continue;
if(snapshot->term < resp.term || !resp.nodeRecognizedAsLeader) continue;
lastContact.heartbeat(contact);
nextRound:
......@@ -358,7 +358,7 @@ void RaftReplicaTracker::main() {
bool warnStreamingHiccup = false;
bool needResilvering = false;
while(shutdown == 0 && snapshot.term == state.getCurrentTerm() && !state.inShutdown()) {
while(shutdown == 0 && snapshot->term == state.getCurrentTerm() && !state.inShutdown()) {
if(warnStreamingHiccup) {
qdb_warn("Hiccup during streaming replication of " << target.toString() << ", switching back to conservative replication.");
......@@ -396,7 +396,7 @@ void RaftReplicaTracker::main() {
}
std::chrono::steady_clock::time_point contact = std::chrono::steady_clock::now();
std::future<redisReplyPtr> fut = talker.appendEntries(snapshot.term, state.getMyself(), nextIndex-1, prevTerm, journal.getCommitIndex(), entries);
std::future<redisReplyPtr> fut = talker.appendEntries(snapshot->term, state.getMyself(), nextIndex-1, prevTerm, journal.getCommitIndex(), entries);
RaftAppendEntriesResponse resp;
// Check: Is the target even online?
......@@ -417,7 +417,7 @@ void RaftReplicaTracker::main() {
}
state.observed(resp.term, {});
if(snapshot.term < resp.term) continue;
if(snapshot->term < resp.term) continue;
lastContact.heartbeat(contact);
// Check: Does the target need resilvering?
......@@ -456,7 +456,7 @@ void RaftReplicaTracker::main() {
// All checks have passed
if(nextIndex+payloadSize != resp.logSize) {
qdb_warn("mismatch in expected logSize. nextIndex = " << nextIndex << ", payloadSize = " << payloadSize << ", logSize: " << resp.logSize << ", resp.term: " << resp.term << ", my term: " << snapshot.term << ", journal size: " << journal.getLogSize());
qdb_warn("mismatch in expected logSize. nextIndex = " << nextIndex << ", payloadSize = " << payloadSize << ", logSize: " << resp.logSize << ", resp.term: " << resp.term << ", my term: " << snapshot->term << ", journal size: " << journal.getLogSize());
}
matchIndex.update(resp.logSize-1);
......@@ -481,9 +481,9 @@ nextRound:
running = false;
}
void RaftReplicator::activate(RaftStateSnapshot &snapshot_) {
void RaftReplicator::activate(RaftStateSnapshotPtr &snapshot_) {
std::lock_guard<std::recursive_mutex> lock(mtx);
qdb_event("Activating replicator for term " << snapshot_.term);
qdb_event("Activating replicator for term " << snapshot_->term);
qdb_assert(targets.empty());
snapshot = snapshot_;
......
......@@ -47,7 +47,7 @@ class RaftCommitTracker; class RaftMatchIndexTracker; class RaftLastContact;
class RaftReplicaTracker {
public:
RaftReplicaTracker(const RaftServer &target, const RaftStateSnapshot &snapshot, RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftTimeouts t);
RaftReplicaTracker(const RaftServer &target, const RaftStateSnapshotPtr &snapshot, RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftTimeouts t);
~RaftReplicaTracker();
ReplicaStatus getStatus();
......@@ -78,7 +78,7 @@ private:
bool buildPayload(LogIndex nextIndex, int64_t payloadLimit, std::vector<RaftSerializedEntry> &entries, int64_t &payloadSize);
RaftServer target;
RaftStateSnapshot snapshot;
RaftStateSnapshotPtr snapshot;
void updateStatus(bool online, LogIndex nextIndex);
......@@ -121,7 +121,7 @@ public:
RaftReplicator(RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftTimeouts t);
~RaftReplicator();
void activate(RaftStateSnapshot &snapshot);
void activate(RaftStateSnapshotPtr &snapshot);
void deactivate();