Commit c7f03d4a authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement RECOVERY-GET-ALL-VERSIONS

parent b6cc02b5
Pipeline #998333 passed with stages
in 40 minutes and 40 seconds
......@@ -160,6 +160,7 @@ struct cmdMapInit {
redis_cmd_map["recovery_del"] = {RedisCommand::RECOVERY_DEL, CommandType::RECOVERY};
redis_cmd_map["recovery_force_reconfigure_journal"] = {RedisCommand::RECOVERY_FORCE_RECONFIGURE_JOURNAL, CommandType::RECOVERY};
redis_cmd_map["recovery_scan"] = {RedisCommand::RECOVERY_SCAN, CommandType::RECOVERY};
redis_cmd_map["recovery_get_all_versions"] = {RedisCommand::RECOVERY_GET_ALL_VERSIONS, CommandType::RECOVERY};
redis_cmd_map["convert_string_to_int"] = {RedisCommand::CONVERT_STRING_TO_INT, CommandType::CONTROL};
redis_cmd_map["convert_int_to_string"] = {RedisCommand::CONVERT_INT_TO_STRING, CommandType::CONTROL};
......
......@@ -164,6 +164,7 @@ enum class RedisCommand {
RECOVERY_INFO,
RECOVERY_FORCE_RECONFIGURE_JOURNAL,
RECOVERY_SCAN,
RECOVERY_GET_ALL_VERSIONS,
CONVERT_STRING_TO_INT,
CONVERT_INT_TO_STRING,
......
......@@ -125,6 +125,13 @@ RedisEncodedResponse RecoveryDispatcher::dispatch(RedisRequest &request) {
else nextCursor = "next:" + nextCursor;
return Formatter::scan(nextCursor, results);
}
case RedisCommand::RECOVERY_GET_ALL_VERSIONS: {
if(request.size() != 2) return Formatter::errArgs(request[0]);
std::vector<std::string> results;
editor.getAllVersions(request[0], results);
return Formatter::vector(results);
}
default: {
qdb_throw("should never reach here");
}
......
......@@ -31,6 +31,7 @@
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/debug.h>
using namespace quarkdb;
......@@ -129,3 +130,18 @@ rocksdb::Status RecoveryEditor::scan(std::string_view key, size_t count, std::st
return rocksdb::Status::OK();
}
rocksdb::Status RecoveryEditor::getAllVersions(std::string_view key, std::vector<std::string> &output) {
std::vector<rocksdb::KeyVersion> versions;
rocksdb::GetAllKeyVersions(db.get(), key, key, std::numeric_limits<size_t>::max(), &versions);
for(const rocksdb::KeyVersion& ver : versions) {
output.emplace_back(SSTR("KEY: " << ver.user_key));
output.emplace_back(SSTR("VALUE: " << ver.value));
output.emplace_back(SSTR("SEQUENCE: " << ver.sequence));
output.emplace_back(SSTR("TYPE: " << ver.type));
}
return rocksdb::Status::OK();
}
......@@ -43,6 +43,7 @@ public:
rocksdb::Status set(std::string_view key, std::string_view value);
rocksdb::Status del(std::string_view key);
rocksdb::Status scan(std::string_view key, size_t count, std::string &nextCursor, std::vector<std::string> &elements);
rocksdb::Status getAllVersions(std::string_view key, std::vector<std::string> &output);
private:
std::string path;
......
......@@ -111,6 +111,9 @@ TEST(Recovery, RemoveJournalEntriesAndChangeClusterID) {
ASSERT_REPLY(qcl.exec("recovery-get", SSTR("E" << intToBinaryString(2))), RaftEntry(4, "set", "abc", "cdf").serialize());
ASSERT_REPLY(qcl.exec("recovery-del", SSTR("E" << intToBinaryString(2))), "OK");
ASSERT_REPLY_DESCRIBE(qcl.exec("recovery-get-all-versions", SSTR("E" << intToBinaryString(2))).get(),
"(empty list or set)\n");
std::vector<std::string> rep = {
"RAFT_CURRENT_TERM",
intToBinaryString(4),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment