Commit ac5436e8 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Provide more information to caller about acquired lease

parent cca825ae
Pipeline #429368 passed with stages
in 44 minutes and 5 seconds
......@@ -296,7 +296,8 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
qdb_assert(request[4].size() == 8u);
bool acquired;
rocksdb::Status st = store.lease_acquire(stagingArea, request[1], request[2], binaryStringToUnsignedInt(request[4].c_str()), duration, acquired);
LeaseInfo leaseInfo;
rocksdb::Status st = store.lease_acquire(stagingArea, request[1], request[2], binaryStringToUnsignedInt(request[4].c_str()), duration, leaseInfo, acquired);
if(!st.ok()) return Formatter::fromStatus(st);
......
......@@ -943,6 +943,10 @@ void StateMachine::WriteOperation::writeLocalityIndex(const std::string &field,
stagingArea.put(locator.toSlice(), hint);
}
void StateMachine::WriteOperation::cancel() {
finalized = true;
}
rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize, bool forceUpdate) {
assertWritable();
......@@ -1135,12 +1139,16 @@ void StateMachine::getClock(ClockValue &value) {
getClock(stagingArea, value);
}
rocksdb::Status StateMachine::lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired) {
rocksdb::Status StateMachine::lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, bool &acquired) {
qdb_assert(!value.empty());
// First, some timekeeping, update clock time if necessary.
clockUpdate = maybeAdvanceClock(stagingArea, clockUpdate);
// Ensure the key pointed to is either a lease, or non-existent.
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return wrong_type();
// Quick check that no-one else holds the lease right now.
// Could it be that the lease has actually expired? Not at this point.
// advanceClock() should have taken care of removing expired leases.
......@@ -1152,15 +1160,16 @@ rocksdb::Status StateMachine::lease_acquire(StagingArea &stagingArea, const std:
if(st.ok()) {
if(oldLeaseHolder != value) {
KeyDescriptor &descriptor = operation.descriptor();
acquired = false;
return rocksdb::Status::InvalidArgument(SSTR("lease being currently held by " << oldLeaseHolder));
info = LeaseInfo(oldLeaseHolder, descriptor.getStartIndex(), descriptor.getEndIndex());
operation.cancel();
return rocksdb::Status::InvalidArgument(SSTR("lease being currently held by " << oldLeaseHolder << ", remaining time: " << descriptor.getEndIndex() - clockUpdate << " ms" ));
}
}
// Looks good.. Either the lease is held by the same holder, and this is
// simply an extension request, or this is a new lease altogether.
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return wrong_type();
KeyDescriptor &descriptor = operation.descriptor();
if(operation.keyExists()) {
......@@ -1182,6 +1191,7 @@ rocksdb::Status StateMachine::lease_acquire(StagingArea &stagingArea, const std:
// Update lease value.
operation.write(value);
acquired = true;
info = LeaseInfo(value, descriptor.getStartIndex(), descriptor.getEndIndex());
return operation.finalize(value.size(), true);
}
......@@ -1710,8 +1720,8 @@ rocksdb::Status StateMachine::lhset(const std::string &key, const std::string &f
CHAIN(index, lhset, key, field, hint, value, fieldcreated);
}
rocksdb::Status StateMachine::lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired, LogIndex index) {
CHAIN(index, lease_acquire, key, value, clockUpdate, duration, acquired);
rocksdb::Status StateMachine::lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, bool &acquired, LogIndex index) {
CHAIN(index, lease_acquire, key, value, clockUpdate, duration, info, acquired);
}
rocksdb::Status StateMachine::lease_get(const std::string &key, ClockValue clockUpdate, LeaseInfo &info, LogIndex index) {
......
......@@ -81,7 +81,7 @@ public:
rocksdb::Status rpop(StagingArea &stagingArea, const std::string &key, std::string &item);
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
rocksdb::Status lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired);
rocksdb::Status lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, bool &acquired);
rocksdb::Status lease_release(StagingArea &stagingArea, const std::string &key);
rocksdb::Status lease_get(StagingArea &stagingArea, const std::string &key, ClockValue clockUpdate, LeaseInfo &info);
......@@ -149,7 +149,7 @@ public:
void advanceClock(ClockValue newValue, LogIndex index = 0);
void getClock(ClockValue &value);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
rocksdb::Status lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, bool &acquired, LogIndex index = 0);
rocksdb::Status lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, bool &acquired, LogIndex index = 0);
rocksdb::Status lease_release(const std::string &key, LogIndex index = 0);
rocksdb::Status lease_get(const std::string &key, ClockValue clockUpdate, LeaseInfo &info, LogIndex index = 0);
......@@ -235,6 +235,7 @@ private:
bool keyExists();
bool getField(const std::string &field, std::string &out);
bool getLocalityIndex(const std::string &field, std::string &out);
void cancel();
int64_t keySize();
......
......@@ -708,8 +708,12 @@ TEST_F(State_Machine, Leases) {
bool acquired;
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(1), 10, acquired));
LeaseInfo info;
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(1), 10, info, acquired));
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 11u);
ASSERT_EQ(info.getLastRenewal(), 1u);
ASSERT_EQ(info.getValue(), "some-string");
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 1u);
......@@ -724,8 +728,11 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(9), 10, acquired));
ASSERT_OK(stateMachine()->lease_acquire("my-lease", "some-string", ClockValue(9), 10, info, acquired));
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 19u);
ASSERT_EQ(info.getLastRenewal(), 9u);
ASSERT_EQ(info.getValue(), "some-string");
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 9u);
......@@ -740,14 +747,20 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
stateMachine()->lease_acquire("my-lease", "some-other-string", ClockValue(12), 10, acquired);
stateMachine()->lease_acquire("my-lease", "some-other-string", ClockValue(12), 10, info, acquired);
ASSERT_FALSE(acquired);
ASSERT_EQ(info.getDeadline(), 19u);
ASSERT_EQ(info.getLastRenewal(), 9u);
ASSERT_EQ(info.getValue(), "some-string");
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 12u);
stateMachine()->lease_acquire("my-lease-2", "some-other-string", ClockValue(13), 10, acquired);
stateMachine()->lease_acquire("my-lease-2", "some-other-string", ClockValue(13), 10, info, acquired);
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 23u);
ASSERT_EQ(info.getLastRenewal(), 13u);
ASSERT_EQ(info.getValue(), "some-other-string");
{
StagingArea stagingArea(*stateMachine());
......@@ -792,11 +805,17 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
stateMachine()->lease_acquire("my-lease-3", "some-other-string", ClockValue(18), 10, acquired);
stateMachine()->lease_acquire("my-lease-3", "some-other-string", ClockValue(18), 10, info, acquired);
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 28u);
ASSERT_EQ(info.getLastRenewal(), 18u);
ASSERT_EQ(info.getValue(), "some-other-string");
stateMachine()->lease_acquire("my-lease-4", "some-other-string", ClockValue(18), 10, acquired);
stateMachine()->lease_acquire("my-lease-4", "some-other-string", ClockValue(18), 10, info, acquired);
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 28u);
ASSERT_EQ(info.getLastRenewal(), 18u);
ASSERT_EQ(info.getValue(), "some-other-string");
stateMachine()->getClock(clk);
ASSERT_EQ(clk, 18u);
......@@ -819,8 +838,11 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
stateMachine()->lease_acquire("my-lease-4", "some-other-string", ClockValue(25), 10, acquired);
stateMachine()->lease_acquire("my-lease-4", "some-other-string", ClockValue(25), 10, info, acquired);
ASSERT_TRUE(acquired);
ASSERT_EQ(info.getDeadline(), 35u);
ASSERT_EQ(info.getLastRenewal(), 25u);
ASSERT_EQ(info.getValue(), "some-other-string");
{
StagingArea stagingArea(*stateMachine());
......@@ -836,7 +858,6 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
LeaseInfo info;
ASSERT_OK(stateMachine()->lease_get("my-lease-4", ClockValue(25), info));
ASSERT_EQ(info.getLastRenewal(), ClockValue(25));
ASSERT_EQ(info.getDeadline(), ClockValue(35));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment