Commit d97fb0be authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement core logic for lease acquisition

parent 0ead6144
Pipeline #423201 passed with stages
in 37 minutes and 23 seconds
......@@ -888,12 +888,17 @@ void StateMachine::WriteOperation::assertWritable() {
void StateMachine::WriteOperation::write(const std::string &value) {
assertWritable();
if(keyinfo.getKeyType() != KeyType::kString) {
qdb_throw("writing without a field makes sense only for strings");
if(keyinfo.getKeyType() == KeyType::kString) {
StringLocator locator(redisKey);
stagingArea.put(locator.toSlice(), value);
}
else if(keyinfo.getKeyType() == KeyType::kLease) {
LeaseLocator locator(redisKey);
stagingArea.put(locator.toSlice(), value);
}
else {
qdb_throw("writing without a field makes sense only for strings and leases");
}
StringLocator slocator(redisKey);
stagingArea.put(slocator.toSlice(), value);
}
void StateMachine::WriteOperation::writeField(const std::string &field, const std::string &value) {
......@@ -1067,6 +1072,70 @@ void StateMachine::getClock(ClockValue &value) {
getClock(stagingArea, value);
}
rocksdb::Status StateMachine::lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired) {
qdb_assert(!value.empty());
// First, some timekeeping. Get current clock time.
ClockValue currentClock;
getClock(stagingArea, currentClock);
// Two cases:
// - currentClock is behind clockUpdate - should be by far the most common.
// Simply update currentClock to clockUpdate.
// - currentClock is ahead.. we were hit by a rare race condition. Advance
// clockUpdate to currentClock instead.
if(currentClock < clockUpdate) {
advanceClock(stagingArea, clockUpdate);
}
else {
clockUpdate = currentClock;
}
// Quick check that no-one else holds the lease right now.
// Could it be that the lease has actually expired? Not at this point.
// advanceClock() should have taken care of removing expired leases.
// TODO: Actually implement that in advanceClock ;>
LeaseLocator locator(key);
std::string oldLeaseHolder;
rocksdb::Status st = stagingArea.get(locator.toSlice(), oldLeaseHolder);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
if(oldLeaseHolder != value) {
acquired = false;
return rocksdb::Status::InvalidArgument(SSTR("lease being currently held by " << oldLeaseHolder));
}
}
// Looks good.. Either the lease is held by the same holder, and this is
// simply an extension request, or this is a new lease altogether.
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return wrong_type();
KeyDescriptor &descriptor = operation.descriptor();
if(operation.keyExists()) {
// Lease extension.. need to wipe out old pending expiration event
ExpirationEventLocator oldEvent(descriptor.getEndIndex(), key);
THROW_ON_ERROR(stagingArea.exists(oldEvent.toSlice()));
stagingArea.del(oldEvent.toSlice());
}
// Anchor expiration timestamp based on clockUpdate.
ClockValue expirationTimestamp = clockUpdate + duration;
descriptor.setStartIndex(clockUpdate);
descriptor.setEndIndex(expirationTimestamp);
// Store expiration event.
ExpirationEventLocator newEvent(expirationTimestamp, key);
stagingArea.put(newEvent.toSlice(), "1");
// Update lease value.
operation.write(value);
acquired = true;
return operation.finalize(value.size());
}
rocksdb::Status StateMachine::llen(StagingArea &stagingArea, const std::string &key, size_t &len) {
len = 0;
......@@ -1569,3 +1638,7 @@ rocksdb::Status StateMachine::configSet(const std::string &key, const std::strin
rocksdb::Status StateMachine::lhset(const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated, LogIndex index) {
CHAIN(index, lhset, key, field, hint, value, fieldcreated);
}
rocksdb::Status StateMachine::lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired, LogIndex index) {
CHAIN(index, lease_acquire, key, value, clockUpdate, duration, acquired);
}
......@@ -79,6 +79,7 @@ public:
rocksdb::Status rpop(StagingArea &stagingArea, const std::string &key, std::string &item);
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
rocksdb::Status lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired);
//----------------------------------------------------------------------------
// API for transactional reads. Can be part of a mixed read-write transaction.
......@@ -107,7 +108,6 @@ public:
//----------------------------------------------------------------------------
// Simple API
//----------------------------------------------------------------------------
rocksdb::Status hget(const std::string &key, const std::string &field, std::string &value);
rocksdb::Status hexists(const std::string &key, const std::string &field);
rocksdb::Status hkeys(const std::string &key, std::vector<std::string> &keys);
......@@ -145,6 +145,7 @@ public:
void advanceClock(ClockValue newValue, LogIndex index = 0);
void getClock(ClockValue &value);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
rocksdb::Status lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired, LogIndex index = 0);
//----------------------------------------------------------------------------
// Internal configuration, not exposed to users through 'KEYS' and friends.
......
......@@ -694,6 +694,31 @@ TEST_F(State_Machine, Clock) {
ASSERT_EQ(clk, 345u);
}
TEST_F(State_Machine, Leases) {
ClockValue clk;
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 0u);
bool acquired;
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(1), 10, acquired));
ASSERT_TRUE(acquired);
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 1u);
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(9), 10, acquired));
ASSERT_TRUE(acquired);
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 9u);
stateMachine()->lease_acquire("my-lease", "some-other-string", ClockValue(12), 10, acquired);
ASSERT_FALSE(acquired);
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 12u);
}
static std::string sliceToString(const rocksdb::Slice &slice) {
return std::string(slice.data(), slice.size());
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment