Commit ea68e6d6 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Add command to retrieve all versions of a rocksdb key stored in LSM tree

parent 2f4350bd
Pipeline #388289 passed with stages
in 22 minutes and 19 seconds
......@@ -58,6 +58,7 @@ struct cmdMapInit {
redis_cmd_map["lhlen"] = {RedisCommand::LHLEN, CommandType::READ};
redis_cmd_map["lhget_with_fallback"] = {RedisCommand::LHGET_WITH_FALLBACK, CommandType::READ};
redis_cmd_map["raw_scan"] = {RedisCommand::RAW_SCAN, CommandType::READ};
redis_cmd_map["raw_get_all_versions"] = {RedisCommand::RAW_GET_ALL_VERSIONS, CommandType::READ};
redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE};
redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE};
......
......@@ -85,6 +85,7 @@ enum class RedisCommand {
LLEN,
RAW_SCAN,
RAW_GET_ALL_VERSIONS,
EXEC,
DISCARD,
......
......@@ -507,6 +507,23 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(results);
}
case RedisCommand::RAW_GET_ALL_VERSIONS: {
if(request.size() != 2) return errArgs(request);
std::vector<rocksdb::KeyVersion> versions;
rocksdb::Status st = store.rawGetAllVersions(request[1], versions);
if(!st.ok()) return Formatter::fromStatus(st);
std::vector<std::string> reply;
for(const rocksdb::KeyVersion& ver : versions) {
reply.emplace_back(SSTR("KEY: " << ver.user_key));
reply.emplace_back(SSTR("VALUE: " << ver.value));
reply.emplace_back(SSTR("SEQUENCE: " << ver.sequence));
reply.emplace_back(SSTR("TYPE: " << ver.type));
}
return Formatter::vector(reply);
}
default: {
return dispatchingError(request, 0);
}
......
......@@ -540,6 +540,9 @@ rocksdb::Status StateMachine::lhlen(StagingArea &stagingArea, const std::string
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions) {
return rocksdb::GetAllKeyVersions(db, key, key, &versions);
}
rocksdb::Status StateMachine::rawScan(StagingArea &stagingArea, const std::string &key, size_t count, std::vector<std::string> &elements) {
elements.clear();
......@@ -548,7 +551,6 @@ rocksdb::Status StateMachine::rawScan(StagingArea &stagingArea, const std::strin
size_t items = 0;
for(iter->Seek(key); iter->Valid(); iter->Next()) {
DBG(items);
if(items >= 1000000u || items >= count) break;
items++;
......
......@@ -33,6 +33,7 @@
#include "storage/KeyConstants.hh"
#include <rocksdb/db.h>
#include <rocksdb/utilities/write_batch_with_index.h>
#include <rocksdb/utilities/debug.h>
#include <condition_variable>
namespace quarkdb {
......@@ -99,6 +100,7 @@ public:
rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status rawScan(StagingArea &stagingArea, const std::string &key, size_t count, std::vector<std::string> &elements);
rocksdb::Status rawGetAllVersions(const std::string &key, std::vector<rocksdb::KeyVersion> &versions);
//----------------------------------------------------------------------------
// Simple API
......
......@@ -1156,3 +1156,34 @@ TEST_F(Raft_e2e, LocalityHash) {
qdb_info(qclient::describeRedisReply(reply));
}
TEST_F(Raft_e2e, RawGetAllVersions) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s2"), 1);
redisReplyPtr reply = tunnel(leaderID)->exec("raw-get-all-versions", "cmyset-for-raw-get##s1").get();
qdb_info(qclient::describeRedisReply(reply));
ASSERT_EQ(reply->elements, 4u);
ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: cmyset-for-raw-get##s1");
ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), "VALUE: 1");
// Ignore sequence number
ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1");
reply = tunnel(leaderID)->exec("raw-get-all-versions", "!myset-for-raw-get").get();
ASSERT_EQ(reply->elements, 8u);
qdb_info(qclient::describeRedisReply(reply));
ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: !myset-for-raw-get");
ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), SSTR("VALUE: c" << intToBinaryString(2)));
ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1");
ASSERT_EQ(std::string(reply->element[4]->str, reply->element[4]->len), "KEY: !myset-for-raw-get");
ASSERT_EQ(std::string(reply->element[5]->str, reply->element[5]->len), SSTR("VALUE: c" << intToBinaryString(1)));
ASSERT_EQ(std::string(reply->element[7]->str, reply->element[7]->len), "TYPE: 1");
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment