Commit da2f6da3 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement read command to list all pending expiration events

parent 290ed980
Pipeline #816631 passed with stages
in 38 minutes and 21 seconds
......@@ -69,6 +69,7 @@ struct cmdMapInit {
redis_cmd_map["type"] = {RedisCommand::TYPE, CommandType::READ};
redis_cmd_map["vhgetall"] = {RedisCommand::VHGETALL, CommandType::READ};
redis_cmd_map["vhlen"] = {RedisCommand::VHLEN, CommandType::READ};
redis_cmd_map["lease_get_pending_expiration_events"] = {RedisCommand::LEASE_GET_PENDING_EXPIRATION_EVENTS, CommandType::READ};
redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE};
redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE};
......
......@@ -109,6 +109,7 @@ enum class RedisCommand {
LEASE_GET,
LEASE_ACQUIRE,
LEASE_RELEASE,
LEASE_GET_PENDING_EXPIRATION_EVENTS,
VHSET,
VHGETALL,
......
......@@ -776,6 +776,23 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
return Formatter::statusVector(reply);
}
case RedisCommand::LEASE_GET_PENDING_EXPIRATION_EVENTS: {
if(request.size() != 1) return errArgs(request);
std::vector<StateMachine::ExpirationEvent> events;
ClockValue staticClock, dynamicClock;
store.lease_get_pending_expiration_events(stagingArea, staticClock, dynamicClock, events);
std::vector<std::string> reply;
reply.emplace_back(SSTR("STATIC-CLOCK: " << staticClock));
reply.emplace_back(SSTR("DYNAMIC-CLOCK: " << dynamicClock));
for(auto it = events.begin(); it != events.end(); it++) {
reply.emplace_back(SSTR(it->deadline << ": " << it->key));
}
return Formatter::vector(reply);
}
case RedisCommand::VHGETALL: {
if(request.size() != 2) return errArgs(request);
std::vector<std::string> vec;
......
......@@ -1430,6 +1430,20 @@ void StateMachine::getClock(StagingArea &stagingArea, ClockValue &value) {
value = binaryStringToUnsignedInt(prevValue.c_str());
}
void StateMachine::lease_get_pending_expiration_events(StagingArea &stagingArea, ClockValue &staticClock, ClockValue &dynamicClock, std::vector<ExpirationEvent> &events) {
events.clear();
getClock(stagingArea, staticClock);
dynamicClock = getDynamicClock();
ExpirationEventIterator iter(stagingArea);
while(iter.valid()) {
events.emplace_back(iter.getRedisKey(), iter.getDeadline());
iter.next();
}
}
void StateMachine::getType(StagingArea &stagingArea, std::string_view key, std::string &keyType) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
keyType = keyTypeAsString(keyinfo.getKeyType());
......
......@@ -146,6 +146,16 @@ public:
rocksdb::Status vhgetall(StagingArea &stagingArea, std::string_view key, std::vector<std::string> &res, uint64_t &version);
rocksdb::Status vhlen(StagingArea &stagingArea, std::string_view key, size_t &len);
// misc
struct ExpirationEvent {
ExpirationEvent(std::string_view sv, ClockValue cv) : key(sv), deadline(cv) {}
std::string key;
ClockValue deadline;
};
void lease_get_pending_expiration_events(StagingArea &stagingArea, ClockValue &staticClock, ClockValue &dynamicClock, std::vector<ExpirationEvent> &events);
//----------------------------------------------------------------------------
// Simple API
//----------------------------------------------------------------------------
......
......@@ -765,11 +765,13 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
std::future<redisReplyPtr> l12 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future<redisReplyPtr> l13 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future<redisReplyPtr> l14 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future<redisReplyPtr> l15 = tunnel(leaderID)->exec("lease-get-pending-expiration-events");
ASSERT_REPLY(l11, "ACQUIRED");
ASSERT_REPLY(l12, "RENEWED");
ASSERT_REPLY(l13, "OK");
ASSERT_REPLY(l14, "ACQUIRED");
l15.get(); // ignore for now..
// Ensure the followers return the correct number of responses on MOVED for
// pipelined writes.
......
......@@ -895,6 +895,19 @@ TEST_F(State_Machine, Leases) {
ASSERT_EQ(iterator.getRedisKey(), "my-lease-2");
iterator.next();
ASSERT_FALSE(iterator.valid());
ClockValue staticClock;
ClockValue dynamicClock;
std::vector<StateMachine::ExpirationEvent> events;
stateMachine()->lease_get_pending_expiration_events(stagingArea, staticClock, dynamicClock, events);
ASSERT_EQ(staticClock, 13u);
ASSERT_EQ(events.size(), 2u);
ASSERT_EQ(events[0].key, "my-lease");
ASSERT_EQ(events[0].deadline, 19u);
ASSERT_EQ(events[1].key, "my-lease-2");
ASSERT_EQ(events[1].deadline, 23u);
}
ASSERT_OK(stateMachine()->lease_release("my-lease-2", ClockValue(13)));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment