Commit 2208f1df authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Ensure timekeeper is hard-synchronized to static clock during leader failover

parent ab45f32a
Pipeline #427907 failed with stages
in 49 minutes and 43 seconds
......@@ -583,6 +583,16 @@ RedisEncodedResponse RedisDispatcher::handleTransaction(RedisRequest &request, L
RedisEncodedResponse RedisDispatcher::dispatch(RedisRequest &request, LogIndex commit) {
if(request.getCommand() == RedisCommand::INVALID) {
if(startswith(request[0], "JOURNAL_")) {
if(request[0] == "JOURNAL_LEADERSHIP_MARKER") {
// Hard-synchronize our dynamic clock to the static one. The dynamic
// clock is only used in leaders to timestamp incoming lease requests.
// So, strictly speaking, synchronizing the clock is only necessary for
// leader nodes, but it's so cheap to do that we don't care. Let's
// synchronize all nodes.
store.hardSynchronizeDynamicClock();
}
store.noop(commit);
return Formatter::ok();
}
......
......@@ -60,7 +60,7 @@ static rocksdb::Status malformed(const std::string &message) {
}
StateMachine::StateMachine(const std::string &f, bool write_ahead_log, bool bulk_load)
: filename(f), writeAheadLog(write_ahead_log), bulkLoad(bulk_load),
: filename(f), writeAheadLog(write_ahead_log), bulkLoad(bulk_load), timeKeeper(0u),
requestCounter(std::chrono::seconds(10)) {
if(writeAheadLog) {
......@@ -174,6 +174,7 @@ void StateMachine::ensureClockSanity(bool justCreated) {
}
// We survived!
timeKeeper.reset(binaryStringToUnsignedInt(value.c_str()));
}
StateMachine::~StateMachine() {
......@@ -198,6 +199,17 @@ void StateMachine::reset() {
retrieveLastApplied();
}
void StateMachine::hardSynchronizeDynamicClock() {
ClockValue syncPoint;
getClock(syncPoint);
timeKeeper.synchronize(syncPoint);
}
ClockValue StateMachine::getDynamicTime() {
return timeKeeper.getDynamicTime();
}
void StateMachine::ensureBulkloadSanity(bool justCreated) {
std::string inBulkload;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_InBulkload, &inBulkload);
......
......@@ -24,6 +24,7 @@
#ifndef QUARKDB_STATE_MACHINE_H
#define QUARKDB_STATE_MACHINE_H
#include "Timekeeper.hh"
#include "Common.hh"
#include "utils/Macros.hh"
#include "utils/RequestCounter.hh"
......@@ -201,6 +202,9 @@ public:
rocksdb::Status verifyChecksum();
RequestCounter& getRequestCounter() { return requestCounter; }
ClockValue getDynamicTime();
void hardSynchronizeDynamicClock();
private:
ClockValue maybeAdvanceClock(StagingArea &stagingArea, ClockValue newValue);
friend class StagingArea;
......@@ -290,6 +294,7 @@ private:
bool writeAheadLog;
bool bulkLoad;
Timekeeper timeKeeper;
RequestCounter requestCounter;
};
......
......@@ -22,33 +22,34 @@
************************************************************************/
#include "Timekeeper.hh"
#include "utils/Macros.hh"
using namespace quarkdb;
Timekeeper::Timekeeper(ClockValue startup) : staticClock(startup) {
anchorPoint = std::chrono::steady_clock::now();
}
bool Timekeeper::synchronize(ClockValue observed) {
void Timekeeper::reset(ClockValue startup) {
std::unique_lock<std::shared_mutex> lock(mtx);
staticClock = startup;
anchorPoint = std::chrono::steady_clock::now();
}
if(observed > staticClock + getTimeSinceAnchor().count() ) {
// We have a timejump. Re-anchor, update static clock
anchorPoint = std::chrono::steady_clock::now();
staticClock = observed;
return true;
}
else {
// Nothing to do, the clock never goes back in time
return false;
}
void Timekeeper::synchronize(ClockValue newval) {
std::unique_lock<std::shared_mutex> lock(mtx);
qdb_assert(staticClock <= newval);
// We have a timejump. Re-anchor, update static clock
anchorPoint = std::chrono::steady_clock::now();
staticClock = newval;
}
ClockValue Timekeeper::getCurrentTime() {
ClockValue Timekeeper::getDynamicTime() const {
std::shared_lock<std::shared_mutex> lock(mtx);
return staticClock + getTimeSinceAnchor().count();
}
std::chrono::milliseconds Timekeeper::getTimeSinceAnchor() {
std::chrono::milliseconds Timekeeper::getTimeSinceAnchor() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - anchorPoint
);
......
......@@ -40,28 +40,43 @@ public:
Timekeeper(ClockValue startup);
//----------------------------------------------------------------------------
// The static clock has been updated to the given value. Most of the time,
// the given value should actually be less than what we have. If that's not
// the case, update.
// Reset a Timekeeper object completely, disregarding its previous state.
// You probably want to use synchronize() to update the clock value!
//----------------------------------------------------------------------------
void reset(ClockValue startup);
//----------------------------------------------------------------------------
// The static clock has been updated to the given value. The static clock
// should _never_ go back in time, that indicates serious corruption - an
// assertion in synchronize() enforces this.
//
// However, the dynamic clock (as given by getCurrentTime) might go back
// if the following happens:
// - synchronize(0)
// - sleep(10 ms)
// - getCurrentTime() -> 10
// - synchronize(5)
// - getCurrentTime() -> 5
//
// A Timekeeper will never go back in time!
// The static clock only went forward in time, but the dynamic clock was
// set back, and that's okay in the context we're using this.
//----------------------------------------------------------------------------
bool synchronize(ClockValue observed);
void synchronize(ClockValue newval);
//----------------------------------------------------------------------------
// Get the current time in milliseconds.
// Get the current dynamic time in milliseconds.
//----------------------------------------------------------------------------
ClockValue getCurrentTime();
ClockValue getDynamicTime() const;
private:
std::shared_mutex mtx;
mutable std::shared_mutex mtx;
ClockValue staticClock;
std::chrono::steady_clock::time_point anchorPoint;
//----------------------------------------------------------------------------
// Get time elapsed since last anchor point
//----------------------------------------------------------------------------
std::chrono::milliseconds getTimeSinceAnchor();
std::chrono::milliseconds getTimeSinceAnchor() const;
};
}
......
......@@ -30,31 +30,47 @@ using namespace quarkdb;
TEST(Timekeeper, BasicSanity) {
Timekeeper tk(ClockValue(123));
ASSERT_GE(tk.getCurrentTime(), ClockValue(123));
std::cerr << "Initialization: " << tk.getCurrentTime() << std::endl;
ASSERT_GE(tk.getDynamicTime(), ClockValue(123));
std::cerr << "Initialization: " << tk.getDynamicTime() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_GE(tk.getCurrentTime(), ClockValue(1123));
std::cerr << "After 1 sec: " << tk.getCurrentTime() << std::endl;
ASSERT_GE(tk.getDynamicTime(), ClockValue(1123));
std::cerr << "After 1 sec: " << tk.getDynamicTime() << std::endl;
// Timekeeper should not go back in time
ASSERT_FALSE(tk.synchronize(ClockValue(1000)));
ASSERT_GE(tk.getCurrentTime(), ClockValue(1123));
std::cerr << "After unsuccessful synchronization: " << tk.getCurrentTime() << std::endl;
// Static clock should not go back in time
ASSERT_THROW(tk.synchronize(15u), FatalException);
ASSERT_GE(tk.getDynamicTime(), ClockValue(1123));
std::cerr << "After unsuccessful synchronization: " << tk.getDynamicTime() << std::endl;
// Timejump
ASSERT_TRUE(tk.synchronize(ClockValue(2000)));
ASSERT_GE(tk.getCurrentTime(), ClockValue(2000));
std::cerr << "After successful synchronization at 2000 ClockValue: " << tk.getCurrentTime() << std::endl;
tk.synchronize(ClockValue(2000));
ASSERT_GE(tk.getDynamicTime(), ClockValue(2000));
std::cerr << "After successful synchronization at 2000 ClockValue: " << tk.getDynamicTime() << std::endl;
// Ensure the clock doesn't go back, or something
ClockValue prevValue = tk.getCurrentTime();
ClockValue prevValue = tk.getDynamicTime();
for(size_t i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ClockValue curVal = tk.getCurrentTime();
ClockValue curVal = tk.getDynamicTime();
ASSERT_GE(curVal, prevValue);
prevValue = curVal;
std::cerr << "Tick: " << prevValue << std::endl;
}
// Timejump which actually sets the dynamic clock back
tk.synchronize(ClockValue(2001));
std::cerr << "Synchronized static clock to 2001" << std::endl;
prevValue = tk.getDynamicTime();
for(size_t i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ClockValue curVal = tk.getDynamicTime();
ASSERT_GE(curVal, prevValue);
prevValue = curVal;
std::cerr << "Tick: " << prevValue << std::endl;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment