Commit 1844aa48 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Start storing an internal clock value in StateMachine

To be used with leases.
parent f8c03c61
Pipeline #421623 failed with stages
in 27 minutes and 26 seconds
......@@ -143,11 +143,39 @@ StateMachine::StateMachine(const std::string &f, bool write_ahead_log, bool bulk
ensureCompatibleFormat(!dirExists);
ensureBulkloadSanity(!dirExists);
ensureClockSanity(!dirExists);
retrieveLastApplied();
consistencyScanner.reset(new ConsistencyScanner(*this));
}
void StateMachine::ensureClockSanity(bool justCreated) {
std::string value;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_Clock, &value);
if(justCreated) {
if(!st.IsNotFound()) qdb_throw("Error when reading __clock, which should not exist: " << st.ToString());
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(0u)));
}
else {
if(st.IsNotFound()) {
// Compatibility: When opening old state machines, set expected __clock key.
// TODO: Remove in a couple of releases.
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(0u)));
}
}
st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_Clock, &value);
if(!st.ok()) qdb_throw("Error when reading __clock: " << st.ToString());
if(value.size() != 8u) {
qdb_throw("Detected corruption of __clock, received size " << value.size() << ", was expecting 8");
}
// We survived!
qdb_info("StateMachine clock is at " << binaryStringToUnsignedInt(value.c_str()));
}
StateMachine::~StateMachine() {
consistencyScanner.reset();
......@@ -166,6 +194,7 @@ void StateMachine::reset() {
ensureCompatibleFormat(true);
ensureBulkloadSanity(true);
ensureClockSanity(true);
retrieveLastApplied();
}
......
......@@ -261,6 +261,7 @@ private:
void retrieveLastApplied();
void ensureCompatibleFormat(bool justCreated);
void ensureBulkloadSanity(bool justCreated);
void ensureClockSanity(bool justCreated);
void remove_all_with_prefix(const rocksdb::Slice &prefix, int64_t &removed, StagingArea &stagingArea);
void lhsetInternal(WriteOperation &operation, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated);
......
......@@ -49,6 +49,7 @@ public:
ADD_TO_ALLKEYS(kStateMachine_Format);
ADD_TO_ALLKEYS(kStateMachine_LastApplied);
ADD_TO_ALLKEYS(kStateMachine_InBulkload);
ADD_TO_ALLKEYS(kStateMachine_Clock);
}
};
......
......@@ -44,6 +44,7 @@ namespace KeyConstants {
constexpr char kStateMachine_Format[] = "__format";
constexpr char kStateMachine_LastApplied[] = "__last-applied";
constexpr char kStateMachine_InBulkload[] = "__in-bulkload";
constexpr char kStateMachine_Clock[] = "__clock";
extern std::vector<std::string> allKeys;
};
......
......@@ -1174,24 +1174,26 @@ TEST_F(Raft_e2e, LocalityHash) {
qclient::describeRedisReply(reply),
"1) \"!mykey\"\n"
"2) \"e\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x03\"\n"
"3) \"__format\"\n"
"4) \"0\"\n"
"5) \"__in-bulkload\"\n"
"6) \"FALSE\"\n"
"7) \"__last-applied\"\n"
"8) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\"\n"
"9) \"emykey##dhint1##f3\"\n"
"10) \"v6\"\n"
"11) \"emykey##dhint2##f2\"\n"
"12) \"v5\"\n"
"13) \"emykey##dhint3##f1\"\n"
"14) \"v3\"\n"
"15) \"emykey##if1\"\n"
"16) \"hint3\"\n"
"17) \"emykey##if2\"\n"
"18) \"hint2\"\n"
"19) \"emykey##if3\"\n"
"20) \"hint1\"\n"
"3) \"__clock\"\n"
"4) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\n"
"5) \"__format\"\n"
"6) \"0\"\n"
"7) \"__in-bulkload\"\n"
"8) \"FALSE\"\n"
"9) \"__last-applied\"\n"
"10) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\"\n"
"11) \"emykey##dhint1##f3\"\n"
"12) \"v6\"\n"
"13) \"emykey##dhint2##f2\"\n"
"14) \"v5\"\n"
"15) \"emykey##dhint3##f1\"\n"
"16) \"v3\"\n"
"17) \"emykey##if1\"\n"
"18) \"hint3\"\n"
"19) \"emykey##if2\"\n"
"20) \"hint2\"\n"
"21) \"emykey##if3\"\n"
"22) \"hint1\"\n"
);
qdb_info(qclient::describeRedisReply(reply));
......
......@@ -51,7 +51,7 @@ TEST(Recovery, BasicSanity) {
std::vector<std::string> magicValues = recovery.retrieveMagicValues();
ASSERT_EQ(magicValues.size(), 16u);
ASSERT_EQ(magicValues.size(), 18u);
ASSERT_EQ(magicValues[0], "RAFT_CURRENT_TERM: NotFound: ");
ASSERT_EQ(magicValues[1], "RAFT_LOG_SIZE: NotFound: ");
......@@ -69,6 +69,8 @@ TEST(Recovery, BasicSanity) {
ASSERT_EQ(magicValues[13], intToBinaryString(2));
ASSERT_EQ(magicValues[14], "__in-bulkload");
ASSERT_EQ(magicValues[15], boolToString(false));
ASSERT_EQ(magicValues[16], "__clock");
ASSERT_EQ(magicValues[17], unsignedIntToBinaryString(0u));
}
TEST(Recovery, RemoveJournalEntriesAndChangeClusterID) {
......@@ -123,7 +125,8 @@ TEST(Recovery, RemoveJournalEntriesAndChangeClusterID) {
"RAFT_PREVIOUS_MEMBERSHIP_EPOCH: NotFound: ",
"__format: NotFound: ",
"__last-applied: NotFound: ",
"__in-bulkload: NotFound: "
"__in-bulkload: NotFound: ",
"__clock: NotFound: "
};
ASSERT_REPLY(qcl.exec("recovery-info"), rep);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment