Commit 2ba03b36 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement atomic LHSET + delete fallback field

parent 0d8a7b4f
Pipeline #386840 passed with stages
in 24 minutes
......@@ -74,6 +74,20 @@ RedisEncodedResponse RedisDispatcher::dispatch(MultiOp &multiOp, LogIndex commit
return builder.buildResponse();
}
RedisEncodedResponse RedisDispatcher::dispatchLHSET(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value) {
bool fieldcreated;
rocksdb::Status st = store.lhset(stagingArea, key, field, hint, value, fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
}
RedisEncodedResponse RedisDispatcher::dispatchHDEL(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end) {
int64_t count = 0;
rocksdb::Status st = store.hdel(stagingArea, key, start, end, count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, RedisRequest &request) {
qdb_assert(request.getCommandType() == CommandType::WRITE);
......@@ -148,10 +162,7 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
}
case RedisCommand::HDEL: {
if(request.size() <= 2) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.hdel(stagingArea, request[1], request.begin()+2, request.end(), count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
return dispatchHDEL(stagingArea, request[1], request.begin()+2, request.end());
}
case RedisCommand::SADD: {
if(request.size() <= 2) return errArgs(request);
......@@ -211,12 +222,7 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
}
case RedisCommand::LHSET: {
if(request.size() != 5) return errArgs(request);
bool fieldcreated;
rocksdb::Status st = store.lhset(stagingArea, request[1], request[2], request[3], request[4], fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
return dispatchLHSET(stagingArea, request[1], request[2], request[3], request[4]);
}
case RedisCommand::LHDEL: {
if(request.size() <= 2) return errArgs(request);
......@@ -231,6 +237,12 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::LHSET_AND_DEL_FALLBACK: {
if(request.size() != 6) return errArgs(request);
RedisEncodedResponse resp = dispatchLHSET(stagingArea, request[1], request[2], request[3], request[4]);
dispatchHDEL(stagingArea, request[5], request.begin()+2, request.begin()+3);
return resp;
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
......
......@@ -42,6 +42,8 @@ public:
class StateMachine; class StagingArea;
using VecIterator = std::vector<std::string>::const_iterator;
class RedisDispatcher : public Dispatcher {
public:
RedisDispatcher(StateMachine &rocksdb);
......@@ -61,6 +63,8 @@ private:
RedisEncodedResponse dispatchHGET(StagingArea &stagingArea, const std::string &key, const std::string &field);
RedisEncodedResponse dispatchLHGET(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint);
RedisEncodedResponse dispatchLHSET(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value);
RedisEncodedResponse dispatchHDEL(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end);
};
......
......@@ -960,7 +960,7 @@ TEST_F(Raft_e2e, LocalityHash) {
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "ayy-lmao", "emptykey"), "v1");
// Update old field, no changes to locality hint.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v2"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f1", "hint1", "v2", "fallback"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v2");
......@@ -990,7 +990,7 @@ TEST_F(Raft_e2e, LocalityHash) {
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Update value and locality hint of second field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint3", "v4"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f2", "hint3", "v4", "fallback"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4");
......@@ -1104,8 +1104,26 @@ TEST_F(Raft_e2e, LocalityHash) {
// Test fallback
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f9", "V"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f8", "Z"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "V");
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f9", "hint1", "VVV"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "VVV");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f9", "hint", "ZZZ", "fb"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f9"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f8"), "Z");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "ZZZ");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f9"), "ZZZ");
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment