Commit 14ca1e59 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement methods to retrieve and advance the state machine clock

parent 1844aa48
Pipeline #422862 passed with stages
in 35 minutes and 44 seconds
......@@ -124,6 +124,7 @@ struct RaftServer {
using RaftClusterID = std::string;
using RaftTerm = int64_t;
using LogIndex = int64_t;
using ClockValue = uint64_t;
}
......
......@@ -173,7 +173,6 @@ void StateMachine::ensureClockSanity(bool justCreated) {
}
// We survived!
qdb_info("StateMachine clock is at " << binaryStringToUnsignedInt(value.c_str()));
}
StateMachine::~StateMachine() {
......@@ -1033,6 +1032,41 @@ rocksdb::Status StateMachine::rpop(StagingArea &stagingArea, const std::string &
return this->listPop(stagingArea, Direction::kRight, key, item);
}
void StateMachine::advanceClock(StagingArea &stagingArea, ClockValue newValue) {
// Assert we're not setting the clock back..
ClockValue prevValue;
getClock(stagingArea, prevValue);
if(newValue < prevValue) {
qdb_throw("Attempted to set state machine clock in the past: " << prevValue << " ==> " << newValue);
}
// Update value
stagingArea.put(KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(newValue));
}
void StateMachine::advanceClock(ClockValue newValue, LogIndex index) {
StagingArea stagingArea(*this);
advanceClock(stagingArea, newValue);
stagingArea.commit(index);
}
void StateMachine::getClock(StagingArea &stagingArea, ClockValue &value) {
std::string prevValue;
THROW_ON_ERROR(stagingArea.get(KeyConstants::kStateMachine_Clock, prevValue));
if(prevValue.size() != 8u) {
qdb_throw("Clock corruption, expected exactly 8 bytes, got " << prevValue.size());
}
value = binaryStringToUnsignedInt(prevValue.c_str());
}
void StateMachine::getClock(ClockValue &value) {
StagingArea stagingArea(*this, true);
getClock(stagingArea, value);
}
rocksdb::Status StateMachine::llen(StagingArea &stagingArea, const std::string &key, size_t &len) {
len = 0;
......
......@@ -78,6 +78,8 @@ public:
rocksdb::Status lpop(StagingArea &stagingArea, const std::string &key, std::string &item);
rocksdb::Status rpop(StagingArea &stagingArea, const std::string &key, std::string &item);
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
//----------------------------------------------------------------------------
// API for transactional reads. Can be part of a mixed read-write transaction.
//----------------------------------------------------------------------------
......@@ -100,7 +102,7 @@ public:
rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status rawScan(StagingArea &stagingArea, const std::string &key, size_t count, std::vector<std::string> &elements);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
void getClock(StagingArea &stagingArea, ClockValue &value);
//----------------------------------------------------------------------------
// Simple API
......@@ -140,6 +142,9 @@ public:
rocksdb::Status lhset(const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated, LogIndex index = 0);
rocksdb::Status lhlen(const std::string &key, size_t &len);
rocksdb::Status lhget(const std::string &key, const std::string &field, const std::string &hint, std::string &value);
void advanceClock(ClockValue newValue, LogIndex index = 0);
void getClock(ClockValue &value);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
//----------------------------------------------------------------------------
// Internal configuration, not exposed to users through 'KEYS' and friends.
......
......@@ -668,6 +668,32 @@ TEST_F(State_Machine, SnapshotReads) {
ASSERT_EQ(count, 1);
}
TEST_F(State_Machine, Clock) {
ClockValue clk;
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 0u);
stateMachine()->advanceClock(ClockValue(123));
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 123u);
stateMachine()->advanceClock(ClockValue(234));
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 234u);
ASSERT_THROW(stateMachine()->advanceClock(ClockValue(233)), FatalException);
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 234u);
stateMachine()->advanceClock(ClockValue(234));
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 234u);
stateMachine()->advanceClock(ClockValue(345));
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 345u);
}
static std::string sliceToString(const rocksdb::Slice &slice) {
return std::string(slice.data(), slice.size());
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment