Commit dc625278 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Collect into a single class all info needed for node-to-node communication

parent 8ae88180
Pipeline #408642 passed with stages
in 29 minutes and 27 seconds
// ----------------------------------------------------------------------
// File: RaftContactDetails.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QUARKDB_RAFT_CONTACT_DETAILS_H
#define QUARKDB_RAFT_CONTACT_DETAILS_H
#include "RaftTimeouts.hh"
namespace quarkdb {
using RaftClusterID = std::string;
//------------------------------------------------------------------------------
//! An immutable class containing all auxiliary information required to
//! establish a node-to-node connection.
//------------------------------------------------------------------------------
class RaftContactDetails {
public:
RaftContactDetails(const RaftClusterID &cid, const RaftTimeouts &t)
: clusterID(cid), timeouts(t) { }
const RaftClusterID& getClusterID() const {
return clusterID;
}
const RaftTimeouts& getRaftTimeouts() const {
return timeouts;
}
private:
const RaftClusterID clusterID;
const RaftTimeouts timeouts;
};
}
#endif
......@@ -28,8 +28,8 @@
#include "../Dispatcher.hh"
using namespace quarkdb;
RaftDirector::RaftDirector(RaftJournal &jour, StateMachine &sm, RaftState &st, RaftLease &ls, RaftCommitTracker &ct, RaftClock &rc, RaftWriteTracker &wt, ShardDirectory &sharddir, RaftConfig &conf, RaftReplicator &rep)
: journal(jour), stateMachine(sm), state(st), raftClock(rc), lease(ls), commitTracker(ct), writeTracker(wt), shardDirectory(sharddir), config(conf), replicator(rep) {
RaftDirector::RaftDirector(RaftJournal &jour, StateMachine &sm, RaftState &st, RaftLease &ls, RaftCommitTracker &ct, RaftClock &rc, RaftWriteTracker &wt, ShardDirectory &sharddir, RaftConfig &conf, RaftReplicator &rep, const RaftContactDetails &cd)
: journal(jour), stateMachine(sm), state(st), raftClock(rc), lease(ls), commitTracker(ct), writeTracker(wt), shardDirectory(sharddir), config(conf), replicator(rep), contactDetails(cd) {
mainThread = std::thread(&RaftDirector::main, this);
}
......@@ -100,7 +100,7 @@ void RaftDirector::runForLeader() {
return;
}
if(!RaftElection::perform(votereq, state, lease, raftClock.getTimeouts())) {
if(!RaftElection::perform(votereq, state, lease, contactDetails)) {
state.dropOut(snapshot->term+1);
}
}
......
......@@ -37,9 +37,11 @@
namespace quarkdb {
class RaftTrimmer; class ShardDirectory; class RaftConfig; class RaftReplicator;
class RaftContactDetails;
class RaftDirector {
public:
RaftDirector(RaftJournal &journal, StateMachine &stateMachine, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftClock &rc, RaftWriteTracker &wt, ShardDirectory &sharddir, RaftConfig &config, RaftReplicator &replicator);
RaftDirector(RaftJournal &journal, StateMachine &stateMachine, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftClock &rc, RaftWriteTracker &wt, ShardDirectory &sharddir, RaftConfig &config, RaftReplicator &replicator, const RaftContactDetails &contactDetails);
~RaftDirector();
DISALLOW_COPY_AND_ASSIGN(RaftDirector);
private:
......@@ -59,6 +61,7 @@ private:
ShardDirectory &shardDirectory;
RaftConfig &config;
RaftReplicator &replicator;
const RaftContactDetails &contactDetails;
std::thread mainThread;
};
......
......@@ -39,7 +39,8 @@ using namespace quarkdb;
RaftGroup::RaftGroup(ShardDirectory &shardDir, const RaftServer &myself, const RaftTimeouts &t)
: shardDirectory(shardDir), stateMachineRef(*shardDirectory.getStateMachine()),
raftJournalRef(*shardDirectory.getRaftJournal()), me(myself), timeouts(t) {
raftJournalRef(*shardDirectory.getRaftJournal()), me(myself),
raftContactDetails(raftJournalRef.getClusterID(), t) {
}
......@@ -123,7 +124,7 @@ RaftDispatcher* RaftGroup::dispatcher() {
RaftClock* RaftGroup::raftclock() {
std::lock_guard<std::recursive_mutex> lock(mtx);
if(clockptr == nullptr) {
clockptr = new RaftClock(timeouts);
clockptr = new RaftClock(contactDetails()->getRaftTimeouts());
}
return clockptr;
}
......@@ -139,7 +140,7 @@ RaftState* RaftGroup::state() {
RaftDirector* RaftGroup::director() {
std::lock_guard<std::recursive_mutex> lock(mtx);
if(directorptr == nullptr) {
directorptr = new RaftDirector(*journal(), *stateMachine(), *state(), *lease(), *commitTracker(), *raftclock(), *writeTracker(), shardDirectory, *config(), *replicator());
directorptr = new RaftDirector(*journal(), *stateMachine(), *state(), *lease(), *commitTracker(), *raftclock(), *writeTracker(), shardDirectory, *config(), *replicator(), *contactDetails());
}
return directorptr;
}
......@@ -187,7 +188,11 @@ RaftConfig* RaftGroup::config() {
RaftReplicator* RaftGroup::replicator() {
std::lock_guard<std::recursive_mutex> lock(mtx);
if(replicatorptr == nullptr) {
replicatorptr = new RaftReplicator(*journal(), *state(), *lease(), *commitTracker(), *trimmer(), shardDirectory, *config(), timeouts);
replicatorptr = new RaftReplicator(*journal(), *state(), *lease(), *commitTracker(), *trimmer(), shardDirectory, *config(), *contactDetails());
}
return replicatorptr;
}
const RaftContactDetails* RaftGroup::contactDetails() const {
return &raftContactDetails;
}
......@@ -25,6 +25,7 @@
#define __QUARKDB_RAFT_GROUP_H__
#include "../Utils.hh"
#include "RaftContactDetails.hh"
namespace quarkdb {
......@@ -35,7 +36,7 @@ class StateMachine; class RaftJournal; class RaftDispatcher;
class RaftState; class RaftReplicator; class RaftClock;
class RaftDirector; class RaftLease; class RaftWriteTracker;
class RaftTrimmer; class RaftCommitTracker; class RaftConfig;
class ShardDirectory;
class ShardDirectory; class RaftContactDetails;
//------------------------------------------------------------------------------
// This class keeps track of and owns all objects needed for the raft party.
......@@ -60,6 +61,7 @@ public:
RaftTrimmer *trimmer();
RaftConfig *config();
RaftReplicator *replicator();
const RaftContactDetails* contactDetails() const;
RaftServer myself();
......@@ -74,7 +76,7 @@ private:
RaftJournal &raftJournalRef;
const RaftServer me;
const RaftTimeouts timeouts;
const RaftContactDetails raftContactDetails;
// All components needed for the raft party - owned by this class.
RaftDispatcher *dispatcherptr = nullptr;
......
......@@ -34,14 +34,15 @@
#include "RaftTimeouts.hh"
#include "RaftLease.hh"
#include "RaftTrimmer.hh"
#include "RaftContactDetails.hh"
#include "../utils/FileUtils.hh"
#include <dirent.h>
#include <fstream>
using namespace quarkdb;
RaftReplicator::RaftReplicator(RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftTimeouts t)
: journal(journal_), state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), timeouts(t) {
RaftReplicator::RaftReplicator(RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftContactDetails &cd)
: journal(journal_), state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), contactDetails(cd) {
}
......@@ -49,9 +50,9 @@ RaftReplicator::~RaftReplicator() {
deactivate();
}
RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStateSnapshotPtr &snapshot_, RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftTimeouts t)
RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStateSnapshotPtr &snapshot_, RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftContactDetails &cd)
: target(target_), snapshot(snapshot_), journal(journal_),
state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), timeouts(t),
state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), contactDetails(cd),
matchIndex(commitTracker.getHandler(target)),
lastContact(lease.getHandler(target)),
trimmingBlock(trimmer, 0) {
......@@ -184,7 +185,7 @@ void RaftReplicaTracker::triggerResilvering() {
}
// Start the resilverer
resilverer.reset(new RaftResilverer(shardDirectory, target, journal.getClusterID(), timeouts, trimmer));
resilverer.reset(new RaftResilverer(shardDirectory, target, contactDetails, trimmer));
}
void RaftReplicaTracker::monitorAckReception(ThreadAssistant &assistant) {
......@@ -193,7 +194,7 @@ void RaftReplicaTracker::monitorAckReception(ThreadAssistant &assistant) {
while(!assistant.terminationRequested()) {
if(inFlight.size() == 0) {
// Empty queue, sleep
inFlightCV.wait_for(lock, timeouts.getHeartbeatInterval());
inFlightCV.wait_for(lock, contactDetails.getRaftTimeouts().getHeartbeatInterval());
continue;
}
......@@ -367,7 +368,7 @@ LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNex
updateStatus(true, nextIndex);
if(nextIndex >= journal.getLogSize()) {
journal.waitForUpdates(nextIndex, timeouts.getHeartbeatInterval());
journal.waitForUpdates(nextIndex, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else {
// fire next round
......@@ -389,7 +390,7 @@ ReplicaStatus RaftReplicaTracker::getStatus() {
}
void RaftReplicaTracker::sendHeartbeats(ThreadAssistant &assistant) {
RaftTalker talker(target, journal.getClusterID(), timeouts);
RaftTalker talker(target, contactDetails);
while(!assistant.terminationRequested() && shutdown == 0 && snapshot->term == state.getCurrentTerm() && !state.inShutdown()) {
std::chrono::steady_clock::time_point contact = std::chrono::steady_clock::now();
......@@ -405,7 +406,7 @@ void RaftReplicaTracker::sendHeartbeats(ThreadAssistant &assistant) {
lastContact.heartbeat(contact);
nextRound:
state.wait(timeouts.getHeartbeatInterval());
state.wait(contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
}
......@@ -437,7 +438,7 @@ private:
};
void RaftReplicaTracker::main() {
RaftTalker talker(target, journal.getClusterID(), timeouts);
RaftTalker talker(target, contactDetails);
LogIndex nextIndex = journal.getLogSize();
RaftMatchIndexTracker &matchIndex = commitTracker.getHandler(target);
......@@ -566,10 +567,10 @@ nextRound:
updateStatus(onlineTracker.isOnline(), nextIndex);
if(!onlineTracker.isOnline() || needResilvering) {
state.wait(timeouts.getHeartbeatInterval());
state.wait(contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else if(onlineTracker.isOnline() && nextIndex >= journal.getLogSize()) {
journal.waitForUpdates(nextIndex, timeouts.getHeartbeatInterval());
journal.waitForUpdates(nextIndex, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else {
// don't wait, fire next round of updates
......@@ -660,7 +661,7 @@ void RaftReplicator::setTargets(const std::vector<RaftServer> &newTargets) {
// add targets?
for(size_t i = 0; i < newTargets.size(); i++) {
if(targets.find(newTargets[i]) == targets.end()) {
targets[newTargets[i]] = new RaftReplicaTracker(newTargets[i], snapshot, journal, state, lease, commitTracker, trimmer, shardDirectory, config, timeouts);
targets[newTargets[i]] = new RaftReplicaTracker(newTargets[i], snapshot, journal, state, lease, commitTracker, trimmer, shardDirectory, config, contactDetails);
}
}
......
......@@ -21,8 +21,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef __QUARKDB_RAFT_REPLICATOR_H__
#define __QUARKDB_RAFT_REPLICATOR_H__
#ifndef QUARKDB_RAFT_REPLICATOR_H
#define QUARKDB_RAFT_REPLICATOR_H
#include "RaftTimeouts.hh"
#include <mutex>
......@@ -41,6 +41,7 @@ class RaftTalker; class RaftResilverer; class RaftTrimmer;
class ShardDirectory; class RaftConfig; class RaftState;
class StateMachine; class RaftJournal; class RaftLease;
class RaftCommitTracker; class RaftMatchIndexTracker; class RaftLastContact;
class RaftContactDetails;
//------------------------------------------------------------------------------
// Tracks a single raft replica
......@@ -48,7 +49,7 @@ class RaftCommitTracker; class RaftMatchIndexTracker; class RaftLastContact;
class RaftReplicaTracker {
public:
RaftReplicaTracker(const RaftServer &target, const RaftStateSnapshotPtr &snapshot, RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftTimeouts t);
RaftReplicaTracker(const RaftServer &target, const RaftStateSnapshotPtr &snapshot, RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftContactDetails &contactDetails);
~RaftReplicaTracker();
ReplicaStatus getStatus();
......@@ -103,7 +104,7 @@ private:
RaftTrimmer &trimmer;
ShardDirectory &shardDirectory;
RaftConfig &config;
const RaftTimeouts timeouts;
const RaftContactDetails &contactDetails;
RaftMatchIndexTracker &matchIndex;
RaftLastContact &lastContact;
......@@ -126,7 +127,7 @@ private:
class RaftReplicator {
public:
RaftReplicator(RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftTimeouts t);
RaftReplicator(RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftContactDetails &contactDetails);
~RaftReplicator();
void activate(RaftStateSnapshotPtr &snapshot);
......@@ -145,7 +146,7 @@ private:
RaftTrimmer &trimmer;
ShardDirectory &shardDirectory;
RaftConfig &config;
const RaftTimeouts timeouts;
const RaftContactDetails &contactDetails;
std::map<RaftServer, RaftReplicaTracker*> targets;
std::recursive_mutex mtx;
......
......@@ -23,6 +23,7 @@
#include "RaftResilverer.hh"
#include "RaftTrimmer.hh"
#include "RaftContactDetails.hh"
#include "../ShardDirectory.hh"
#include "../Utils.hh"
#include <dirent.h>
......@@ -71,10 +72,10 @@ private:
std::string error;
};
RaftResilverer::RaftResilverer(ShardDirectory &dir, const RaftServer &trg, const RaftClusterID &cid, const RaftTimeouts &timeouts, RaftTrimmer &trimmer)
: shardDirectory(dir), target(trg), clusterID(cid),
RaftResilverer::RaftResilverer(ShardDirectory &dir, const RaftServer &trg, const RaftContactDetails &contactDetails, RaftTrimmer &trimmer)
: shardDirectory(dir), target(trg),
trimmingBlock(new RaftTrimmingBlock(trimmer, 0)),
talker(target, clusterID, timeouts) {
talker(target, contactDetails) {
resilveringID = generateUuid();
setStatus(ResilveringState::INPROGRESS, "");
......
......@@ -46,18 +46,17 @@ struct ResilveringStatus {
};
class ShardDirectory; class RaftTrimmer;
class RaftTrimmingBlock;
class RaftTrimmingBlock; class RaftContactDetails;
class RaftResilverer {
public:
RaftResilverer(ShardDirectory &directory, const RaftServer &target, const RaftClusterID &clusterID, const RaftTimeouts &timeouts, RaftTrimmer &trimmer);
RaftResilverer(ShardDirectory &directory, const RaftServer &target, const RaftContactDetails &cd, RaftTrimmer &trimmer);
~RaftResilverer();
ResilveringStatus getStatus();
private:
ShardDirectory &shardDirectory;
RaftServer target;
RaftClusterID clusterID;
std::unique_ptr<RaftTrimmingBlock> trimmingBlock;
RaftTalker talker;
......
......@@ -23,6 +23,7 @@
#include "../utils/IntToBinaryString.hh"
#include "RaftTalker.hh"
#include "RaftContactDetails.hh"
#include "RaftTimeouts.hh"
#include "../Version.hh"
......@@ -32,11 +33,11 @@ class RaftHandshake : public qclient::Handshake {
public:
virtual ~RaftHandshake() override {}
RaftHandshake(const RaftClusterID &clusterID_, const RaftTimeouts &timeouts_)
: clusterID(clusterID_), timeouts(timeouts_) { }
RaftHandshake(const RaftContactDetails &cd)
: contactDetails(cd) { }
virtual std::vector<std::string> provideHandshake() override {
return {"RAFT_HANDSHAKE", VERSION_FULL_STRING, clusterID, timeouts.toString()};
return {"RAFT_HANDSHAKE", VERSION_FULL_STRING, contactDetails.getClusterID(), contactDetails.getRaftTimeouts().toString() };
}
virtual Status validateResponse(const redisReplyPtr &reply) override {
......@@ -57,13 +58,12 @@ public:
virtual void restart() override { }
private:
RaftClusterID clusterID;
RaftTimeouts timeouts;
const RaftContactDetails &contactDetails;
};
RaftTalker::RaftTalker(const RaftServer &server_, const RaftClusterID &clusterID, const RaftTimeouts &timeouts)
: server(server_), tlsconfig(), tunnel(server.hostname, server.port, false, qclient::RetryStrategy::NoRetries(), qclient::BackpressureStrategy::Default(), tlsconfig, std::unique_ptr<Handshake>(new RaftHandshake(clusterID, timeouts)) ) {
RaftTalker::RaftTalker(const RaftServer &server_, const RaftContactDetails &contactDetails)
: server(server_), tlsconfig(), tunnel(server.hostname, server.port, false, qclient::RetryStrategy::NoRetries(), qclient::BackpressureStrategy::Default(), tlsconfig, std::unique_ptr<Handshake>(new RaftHandshake(contactDetails)) ) {
}
......
......@@ -31,12 +31,12 @@
namespace quarkdb {
using namespace qclient;
class RaftTimeouts;
class RaftTimeouts; class RaftContactDetails;
using ResilveringEventID = std::string;
class RaftTalker {
public:
RaftTalker(const RaftServer &server, const RaftClusterID &clusterID, const RaftTimeouts &timeouts);
RaftTalker(const RaftServer &server, const RaftContactDetails &contactDetails);
RaftTalker(const RaftServer &server);
std::future<redisReplyPtr> appendEntries(RaftTerm term, RaftServer leader, LogIndex prevIndex,
RaftTerm prevTerm, LogIndex commit,
......
......@@ -26,10 +26,11 @@
#include "RaftTalker.hh"
#include "RaftState.hh"
#include "RaftLease.hh"
#include "RaftContactDetails.hh"
#include "../Utils.hh"
using namespace quarkdb;
bool RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftTimeouts timeouts) {
bool RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails) {
if(!votereq.candidate.empty()) {
qdb_throw("candidate member of votereq must be empty, it is filled out by this function");
}
......@@ -60,17 +61,17 @@ bool RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease
std::vector<std::future<redisReplyPtr>> futures;
for(const RaftServer &node : state.getNodes()) {
if(node != state.getMyself()) {
talkers.emplace_back(new RaftTalker(node, state.getClusterID(), timeouts));
talkers.emplace_back(new RaftTalker(node, contactDetails));
futures.push_back(talkers.back()->requestVote(votereq));
}
}
std::vector<redisReplyPtr> replies;
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point deadline = now + timeouts.getHeartbeatInterval()*2;
std::chrono::steady_clock::time_point deadline = now + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2;
qdb_info(state.getMyself().toString() << ": Vote requests have been sent off, will allow a window of "
<< timeouts.getLow().count() << "ms to receive replies.");
<< contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies.");
for(size_t i = 0; i < futures.size(); i++) {
if(futures[i].wait_until(deadline) == std::future_status::ready) {
......
......@@ -34,7 +34,7 @@ namespace quarkdb {
//------------------------------------------------------------------------------
// Forward declarations
//------------------------------------------------------------------------------
class RaftState; class RaftLease;
class RaftState; class RaftLease; class RaftContactDetails;
class RaftParser {
public:
......@@ -50,7 +50,7 @@ public:
class RaftElection {
public:
static bool perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftTimeouts timeouts);
static bool perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails);
};
}
......
......@@ -23,7 +23,6 @@
#include "raft/RaftDispatcher.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftConfig.hh"
......
......@@ -23,6 +23,7 @@
#include "utils/IntToBinaryString.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftContactDetails.hh"
#include "Common.hh"
#include "Version.hh"
#include "test-utils.hh"
......@@ -37,7 +38,8 @@ TEST(RaftTalker, T1) {
std::chrono::milliseconds(3));
RaftServer node = {"localhost", 12344};
RaftServer myself = {"its_me_ur_leader", 1337};
RaftTalker talker(node, clusterID, timeouts);
RaftContactDetails cd(clusterID, timeouts);
RaftTalker talker(node, cd);
SocketListener listener(12344);
int s2 = listener.accept();
......
......@@ -30,6 +30,7 @@
#include "raft/RaftMembers.hh"
#include "raft/RaftJournal.hh"
#include "raft/RaftLease.hh"
#include "raft/RaftContactDetails.hh"
#include "Poller.hh"
#include "test-utils.hh"
#include "RedisParser.hh"
......@@ -51,11 +52,11 @@ TEST_F(Raft_Replicator, no_replication_on_myself) {
ASSERT_TRUE(state()->observed(2, {}));
ASSERT_TRUE(state()->becomeCandidate(2));
ASSERT_TRUE(state()->ascend(2));
ASSERT_THROW(RaftReplicaTracker(myself(), state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts()), FatalException);
ASSERT_THROW(RaftReplicaTracker(myself(), state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException);
}
TEST_F(Raft_Replicator, only_leader_can_launch_replicator) {
ASSERT_THROW(RaftReplicaTracker(nodes()[1], state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts()), FatalException);
ASSERT_THROW(RaftReplicaTracker(nodes()[1], state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException);
}
TEST_F(Raft_Replicator, verify_sane_snapshot_term) {
......@@ -67,11 +68,11 @@ TEST_F(Raft_Replicator, verify_sane_snapshot_term) {
RaftStateSnapshotPtr snapshot = state()->getSnapshot();
RaftStateSnapshot snapshot2(*snapshot.get());
snapshot2.term = 3;
ASSERT_THROW(RaftReplicaTracker(nodes()[1], RaftStateSnapshotPtr(new RaftStateSnapshot(snapshot2)), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts()), FatalException);
ASSERT_THROW(RaftReplicaTracker(nodes()[1], RaftStateSnapshotPtr(new RaftStateSnapshot(snapshot2)), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException);
// stale term - this can naturally happen, so it is not an exception
ASSERT_TRUE(state()->observed(4, {}));
RaftReplicaTracker tracker(nodes()[1], snapshot, *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts());
RaftReplicaTracker tracker(nodes()[1], snapshot, *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails());
ASSERT_FALSE(tracker.isRunning());
}
......@@ -90,7 +91,7 @@ TEST_F(Raft_Replicator, do_simple_replication) {
poller(1);
// launch!
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts());
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails());
ASSERT_TRUE(tracker.isRunning());
// populate #0's journal
......@@ -125,7 +126,7 @@ TEST_F(Raft_Replicator, test_replication_with_empty_journals) {
poller(1);
// launch
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts());
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails());
ASSERT_TRUE(tracker.isRunning());
// verify everything's sane
......@@ -159,7 +160,7 @@ TEST_F(Raft_Replicator, follower_has_larger_journal_than_leader) {
poller(1);
// launch!
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts());
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails());
ASSERT_TRUE(tracker.isRunning());
// verify #1 recognized #0 as leader and that replication was successful
......@@ -188,7 +189,7 @@ TEST_F(Raft_Replicator, no_replication_of_higher_term_entries) {
poller(1);
// launch!
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), raftclock()->getTimeouts());
RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails());
RETRY_ASSERT_TRUE(!tracker.isRunning());
ASSERT_TRUE(journal(0)->getCommitIndex() == 0);
ASSERT_TRUE(journal(1)->getCommitIndex() == 0);
......@@ -351,9 +352,9 @@ TEST_F(Raft_Dispatcher, incompatible_timeouts) {
// try to talk to a raft server while providing the wrong timeouts
poller(0);
RaftTimeouts timeouts(std::chrono::milliseconds(1), std::chrono::milliseconds(2),
std::chrono::milliseconds(3));
RaftTalker talker(myself(0), clusterID(), timeouts);
RaftContactDetails cd(clusterID(), RaftTimeouts(std::chrono::milliseconds(1), std::chrono::milliseconds(2),
std::chrono::milliseconds(3)));
RaftTalker talker(myself(0), cd);
RaftVoteRequest votereq;</