Commit ab7e39a0 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Fix 0.4.0 regression which could cause replication to get stuck

parent 3dacfdd2
Pipeline #1479739 failed with stages
in 71 minutes and 31 seconds
......@@ -3,6 +3,8 @@
## Unreleased
### Bug fixes
- Under complicated conditions, replication towards a particular follower could become stuck.
(to workaround, restart leader node)
- Running ``DEL`` on a lease key would cause all nodes in a cluster to crash
with an assertion. ``DEL`` will now simply release the given lease, as if
``lease-release`` had been called.
......@@ -19,6 +21,8 @@ checkpoint for long-term storage.
- Attempt to detect potential ``MANIFEST`` corruption early by measuring mtime lag
compared to newest SST file.
Thanks to Pete Eby (ORNL) for the bug report relating to replication becoming stuck.
## 0.4.1 (2020-01-17)
### Bug fixes
......
......@@ -354,12 +354,7 @@ LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNex
const int64_t payloadLimit = 512;
LogIndex nextIndex = firstNextIndex;
while(shutdown == 0 && state.isSnapshotCurrent(snapshot.get())) {
if(!streamingUpdates) {
// Something went wrong while streaming, return to parent to stabilize
return nextIndex;
}
while(shutdown == 0 && streamingUpdates && state.isSnapshotCurrent(snapshot.get())) {
std::chrono::steady_clock::time_point contact;
std::future<redisReplyPtr> fut;
int64_t payloadSize;
......@@ -381,7 +376,7 @@ LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNex
inFlightCV.notify_one();
while(inFlight.size() >= 512 && shutdown == 0 && state.isSnapshotCurrent(snapshot.get())) {
while(inFlight.size() >= 512 && shutdown == 0 && streamingUpdates && state.isSnapshotCurrent(snapshot.get())) {
inFlightPoppedCV.wait_for(lock, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
......
......@@ -733,7 +733,7 @@ TEST_F(Raft_Director, achieve_natural_election) {
spinup(0); spinup(1); spinup(2);
std::vector<RaftStateSnapshotPtr> snapshots;
RETRY_ASSERT_TRUE(checkStateConsensusWithSnapshots(snapshots, 0, 1, 2));
RETRY_ASSERT_TRUE(checkStateConsensusWithSnapshots(true, snapshots, 0, 1, 2));
// verify all have agreed on the same term
ASSERT_EQ(snapshots[0]->term, snapshots[1]->term);
......
......@@ -380,6 +380,32 @@ TEST_F(Replication, several_transitions) {
ASSERT_TRUE(crossCheckJournals(0, 1, 2));
}
TEST_F(Replication, FollowerLaggingBy1m) {
Connection::setPhantomBatchLimit(1);
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
std::vector<std::future<redisReplyPtr>> futs;
for(size_t i = 0; i < 1'000'000; i++) {
futs.emplace_back(tunnel(leaderID)->exec("set", "abc", SSTR(i)));
}
for(size_t i = 0; i < futs.size(); i++) {
ASSERT_REPLY_DESCRIBE(futs[i].get(), "OK");
}
qdb_info("All writes processed, waiting until follower catches up...");
spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
// bringing the lagging follower up-to-date could take a while
RETRY_ASSERT_TRUE_10MIN(checkFullConsensus(0, 1, 2));
ASSERT_TRUE(crossCheckJournals(0, 1, 2));
}
TEST_F(Membership, prevent_promotion_of_outdated_observer) {
// Only start nodes 0, 1, 2 of a 5-node cluster
spinup(0); spinup(1); spinup(2);
......
......@@ -107,6 +107,8 @@ class Publisher;
#define RETRY_ASSERT_EQ(cond1, cond2) RETRY_ASSERT_EQ_3(cond1, cond2, NUMBER_OF_RETRIES, 10)
#define RETRY_ASSERT_NE(cond1, cond2) RETRY_ASSERT_NE_3(cond1, cond2, NUMBER_OF_RETRIES, 10)
#define RETRY_ASSERT_TRUE_10MIN(cond) RETRY_ASSERT_TRUE_3(cond, 100*60*10, 10)
extern std::vector<RedisRequest> testreqs;
// necessary because C macros are dumb and don't undestand
......@@ -266,7 +268,7 @@ public:
// - All state machines have already applied all entries in the journal
template<typename... Args>
bool checkFullConsensus(const Args... args) {
if(!checkStateConsensus(args...)) return false;
if(!checkStateConsensusQuiet(args...)) return false;
std::vector<int> arguments = { args... };
LogIndex targetEntry = journal(arguments[0])->getLogSize() - 1;
......@@ -278,6 +280,7 @@ public:
if(stateMachine(arguments[i])->getLastApplied() != targetEntry) return false;
}
qdb_info("Achieved full consensus with journal size " << targetEntry);
return true;
}
......@@ -335,7 +338,7 @@ public:
}
template<typename... Args>
bool checkStateConsensusWithSnapshots(std::vector<RaftStateSnapshotPtr> &snapshots,
bool checkStateConsensusWithSnapshots(bool quiet, std::vector<RaftStateSnapshotPtr> &snapshots,
const Args... args) {
std::vector<int> arguments = { args... };
......@@ -372,16 +375,26 @@ public:
return false;
}
qdb_info("Achieved state consensus for term " << snapshots[0]->term << " with leader " << snapshots[0]->leader.toString());
if(!quiet) {
qdb_info("Achieved state consensus for term " << snapshots[0]->term << " with leader " << snapshots[0]->leader.toString());
}
return true;
}
template<typename... Args>
bool checkStateConsensus(const Args... args) {
std::vector<RaftStateSnapshotPtr> snapshots;
return checkStateConsensusWithSnapshots(snapshots, args...);
return checkStateConsensusWithSnapshots(false, snapshots, args...);
}
template<typename... Args>
bool checkStateConsensusQuiet(const Args... args) {
std::vector<RaftStateSnapshotPtr> snapshots;
return checkStateConsensusWithSnapshots(true, snapshots, args...);
}
int getServerID(const RaftServer &srv);
std::vector<RaftServer> retrieveLeaders();
int getLeaderID();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment