Commit 70993a45 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement user-facing lease-release command

parent 67df5e6a
Pipeline #434579 passed with stages
in 30 minutes and 17 seconds
......@@ -91,8 +91,10 @@ struct cmdMapInit {
redis_cmd_map["convert_hash_field_to_lhash"] = {RedisCommand::CONVERT_HASH_FIELD_TO_LHASH, CommandType::WRITE};
redis_cmd_map["lease_acquire"] = {RedisCommand::LEASE_ACQUIRE, CommandType::WRITE};
redis_cmd_map["lease_get"] = {RedisCommand::LEASE_GET, CommandType::WRITE};
redis_cmd_map["lease_release"] = {RedisCommand::LEASE_RELEASE, CommandType::WRITE};
redis_cmd_map["timestamped_lease_acquire"] = {RedisCommand::TIMESTAMPED_LEASE_ACQUIRE, CommandType::WRITE};
redis_cmd_map["timestamped_lease_get"] = {RedisCommand::TIMESTAMPED_LEASE_GET, CommandType::WRITE};
redis_cmd_map["timestamped_lease_release"] = {RedisCommand::TIMESTAMPED_LEASE_RELEASE, CommandType::WRITE};
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
......
......@@ -102,9 +102,11 @@ enum class RedisCommand {
LEASE_GET,
LEASE_ACQUIRE,
LEASE_RELEASE,
TIMESTAMPED_LEASE_GET,
TIMESTAMPED_LEASE_ACQUIRE,
TIMESTAMPED_LEASE_RELEASE,
CONFIG_GET,
CONFIG_SET,
......
......@@ -333,6 +333,21 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
reply.emplace_back(SSTR("REMAINING: " << leaseInfo.getDeadline() - timestamp << " ms"));
return Formatter::statusVector(reply);
}
case RedisCommand::TIMESTAMPED_LEASE_RELEASE: {
if(request.size() != 3) return Formatter::errArgs("lease_release");
qdb_assert(request[2].size() == 8u);
ClockValue timestamp = binaryStringToUnsignedInt(request[2].c_str());
rocksdb::Status st = store.lease_release(stagingArea, request[1], timestamp);
if(st.IsNotFound()) {
return Formatter::null();
}
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
......
......@@ -123,7 +123,7 @@ LinkStatus Shard::dispatch(Connection *conn, Transaction &transaction) {
ClockValue txTimestamp = stateMachine->getDynamicClock();
for(size_t i = 0; i < transaction.size(); i++) {
if(transaction[i].getCommand() == RedisCommand::LEASE_GET || transaction[i].getCommand() == RedisCommand::LEASE_ACQUIRE) {
if(transaction[i].getCommand() == RedisCommand::LEASE_GET || transaction[i].getCommand() == RedisCommand::LEASE_ACQUIRE || transaction[i].getCommand() == RedisCommand::LEASE_RELEASE) {
// TODO(gbitzes): This is racy.. we should timestampt after getting a raft
// snapshot, but we need to refactor transactions a bit first.
LeaseFilter::transform(transaction[i], txTimestamp);
......
......@@ -1066,7 +1066,7 @@ void StateMachine::advanceClock(StagingArea &stagingArea, ClockValue newValue) {
// Clear out any leases past the deadline
ExpirationEventIterator iter(stagingArea);
while(iter.valid() && iter.getDeadline() <= newValue) {
qdb_assert(lease_release(stagingArea, std::string(iter.getRedisKey())).ok());
qdb_assert(lease_release(stagingArea, std::string(iter.getRedisKey()), ClockValue(0)).ok());
iter.next();
}
......@@ -1197,7 +1197,14 @@ LeaseAcquisitionStatus StateMachine::lease_acquire(StagingArea &stagingArea, con
return LeaseAcquisitionStatus::kAcquired;
}
rocksdb::Status StateMachine::lease_release(StagingArea &stagingArea, const std::string &key) {
rocksdb::Status StateMachine::lease_release(StagingArea &stagingArea, const std::string &key, ClockValue clockUpdate) {
// First, some timekeeping, update clock time if necessary.
if(clockUpdate != 0u) {
// maybeAdvanceClock will also call this function.. Avoid infinite loop
// by supplying clockUpdate == 0u.
maybeAdvanceClock(stagingArea, clockUpdate);
}
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return wrong_type();
......@@ -1730,6 +1737,6 @@ rocksdb::Status StateMachine::lease_get(const std::string &key, ClockValue clock
CHAIN(index, lease_get, key, clockUpdate, info);
}
rocksdb::Status StateMachine::lease_release(const std::string &key, LogIndex index) {
CHAIN(index, lease_release, key);
rocksdb::Status StateMachine::lease_release(const std::string &key, ClockValue clockUpdate, LogIndex index) {
CHAIN(index, lease_release, key, clockUpdate);
}
......@@ -89,7 +89,7 @@ public:
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
LeaseAcquisitionStatus lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info);
rocksdb::Status lease_release(StagingArea &stagingArea, const std::string &key);
rocksdb::Status lease_release(StagingArea &stagingArea, const std::string &key, ClockValue clockValue);
rocksdb::Status lease_get(StagingArea &stagingArea, const std::string &key, ClockValue clockUpdate, LeaseInfo &info);
//----------------------------------------------------------------------------
......@@ -157,7 +157,7 @@ public:
void getClock(ClockValue &value);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
LeaseAcquisitionStatus lease_acquire(const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, LogIndex index = 0);
rocksdb::Status lease_release(const std::string &key, LogIndex index = 0);
rocksdb::Status lease_release(const std::string &key, ClockValue clockUpdate, LogIndex index = 0);
rocksdb::Status lease_get(const std::string &key, ClockValue clockUpdate, LeaseInfo &info, LogIndex index = 0);
//----------------------------------------------------------------------------
......
......@@ -53,7 +53,7 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, Transaction &transaction)
ClockValue txTimestamp = stateMachine.getDynamicClock();
for(size_t i = 0; i < transaction.size(); i++) {
if(transaction[i].getCommand() == RedisCommand::LEASE_GET || transaction[i].getCommand() == RedisCommand::LEASE_ACQUIRE) {
if(transaction[i].getCommand() == RedisCommand::LEASE_GET || transaction[i].getCommand() == RedisCommand::LEASE_ACQUIRE || transaction[i].getCommand() == RedisCommand::LEASE_RELEASE) {
// TODO(gbitzes): This is racy.. we should timestampt after getting a raft
// snapshot, but we need to refactor transactions a bit first.
LeaseFilter::transform(transaction[i], txTimestamp);
......
......@@ -27,6 +27,7 @@ using namespace quarkdb;
void InternalFilter::process(RedisRequest &req) {
switch(req.getCommand()) {
case RedisCommand::TIMESTAMPED_LEASE_RELEASE:
case RedisCommand::TIMESTAMPED_LEASE_ACQUIRE:
case RedisCommand::TIMESTAMPED_LEASE_GET: {
// Bad client, bad. No cookie for you.
......
......@@ -31,7 +31,7 @@
using namespace quarkdb;
void LeaseFilter::transform(RedisRequest &req, ClockValue timestamp) {
qdb_assert(req.getCommand() == RedisCommand::LEASE_GET || req.getCommand() == RedisCommand::LEASE_ACQUIRE);
qdb_assert(req.getCommand() == RedisCommand::LEASE_GET || req.getCommand() == RedisCommand::LEASE_ACQUIRE || req.getCommand() == RedisCommand::LEASE_RELEASE);
if(req.getCommand() == RedisCommand::LEASE_GET) {
req[0] = "TIMESTAMPED_LEASE_GET";
......@@ -45,6 +45,12 @@ void LeaseFilter::transform(RedisRequest &req, ClockValue timestamp) {
req.parseCommand();
return;
}
else if(req.getCommand() == RedisCommand::LEASE_RELEASE) {
req[0] = "TIMESTAMPED_LEASE_RELEASE";
req.emplace_back(unsignedIntToBinaryString(timestamp));
req.parseCommand();
return;
}
qdb_throw("should never reach here");
}
......@@ -329,6 +329,8 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
futures.emplace_back(tunnel(leaderID)->exec("smembers", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("get", "empty_key"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-acquire", "123"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-get", "123"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-release", "123"));
ASSERT_REPLY(futures[0], 3);
ASSERT_REPLY(futures[1], 3);
......@@ -339,6 +341,8 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
ASSERT_REPLY(futures[6], make_vec("c"));
ASSERT_NIL(futures[7]);
ASSERT_REPLY(futures[8], "ERR unknown command 'timestamped-lease-acquire'" );
ASSERT_REPLY(futures[9], "ERR unknown command 'timestamped-lease-get'" );
ASSERT_REPLY(futures[10], "ERR unknown command 'timestamped-lease-release'" );
futures.clear();
......@@ -576,7 +580,7 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
ASSERT_REPLY(fut4, "3");
// Test lease commands.
std::future<redisReplyPtr> l0 = tunnel(leaderID)->exec("lease-acquire", "get");
std::future<redisReplyPtr> l0 = tunnel(leaderID)->exec("lease-acquire", "qcl-counter", "holder1", "10000");
std::future<redisReplyPtr> l1 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000");
std::future<redisReplyPtr> l2 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future<redisReplyPtr> l3 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000");
......@@ -596,6 +600,30 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
qdb_info(qclient::describeRedisReply(replyL4));
ASSERT_TRUE(StringUtils::startswith(qclient::describeRedisReply(replyL4), "1) HOLDER: holder1\n2) REMAINING: "));
ASSERT_NIL(l5);
std::future<redisReplyPtr> l6 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future<redisReplyPtr> l7 = tunnel(leaderID)->exec("lease-release", "mykey-2");
std::future<redisReplyPtr> l8 = tunnel(leaderID)->exec("lease-release", "qcl-counter");
std::future<redisReplyPtr> l9 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future<redisReplyPtr> l10 = tunnel(leaderID)->exec("lease-get", "mykey");
ASSERT_REPLY(l6, "OK");
ASSERT_NIL(l7);
ASSERT_REPLY(l8, "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_NIL(l9);
ASSERT_NIL(l10);
std::future<redisReplyPtr> l11 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future<redisReplyPtr> l12 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future<redisReplyPtr> l13 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future<redisReplyPtr> l14 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
ASSERT_REPLY(l11, "ACQUIRED");
ASSERT_REPLY(l12, "RENEWED");
ASSERT_REPLY(l13, "OK");
ASSERT_REPLY(l14, "ACQUIRED");
}
TEST_F(Raft_e2e, replication_with_trimmed_journal) {
......
......@@ -780,13 +780,13 @@ TEST_F(State_Machine, Leases) {
ASSERT_FALSE(iterator.valid());
}
ASSERT_OK(stateMachine()->lease_release("my-lease-2"));
ASSERT_OK(stateMachine()->lease_release("my-lease-2", ClockValue(13)));
int64_t count = 0;
std::vector<std::string> keys = { "my-lease-2" };
ASSERT_OK(stateMachine()->exists(keys.begin(), keys.end(), count) );
ASSERT_EQ(count, 0);
ASSERT_NOTFOUND(stateMachine()->lease_release("not-existing"));
ASSERT_NOTFOUND(stateMachine()->lease_release("not-existing", ClockValue(13)));
{
StagingArea stagingArea(*stateMachine());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment