Commit 166183a6 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement journal method to scan through contents, optionally matching a search pattern

parent 825e2d4f
......@@ -132,6 +132,8 @@ struct cmdMapInit {
redis_cmd_map["raft_promote_observer"] = {RedisCommand::RAFT_PROMOTE_OBSERVER, CommandType::RAFT};
redis_cmd_map["raft_heartbeat"] = {RedisCommand::RAFT_HEARTBEAT, CommandType::RAFT};
redis_cmd_map["raft_fetch_last"] = {RedisCommand::RAFT_FETCH_LAST, CommandType::RAFT};
redis_cmd_map["raft_journal_scan"] = {RedisCommand::RAFT_JOURNAL_SCAN, CommandType::RAFT};
redis_cmd_map["activate_stale_reads"] = {RedisCommand::ACTIVATE_STALE_READS, CommandType::RAFT};
redis_cmd_map["quarkdb_info"] = {RedisCommand::QUARKDB_INFO, CommandType::QUARKDB};
......
......@@ -139,6 +139,8 @@ enum class RedisCommand {
RAFT_PROMOTE_OBSERVER,
RAFT_HEARTBEAT,
RAFT_FETCH_LAST,
RAFT_JOURNAL_SCAN,
ACTIVATE_STALE_READS,
QUARKDB_INFO,
......
......@@ -30,6 +30,7 @@
#include "../utils/IntToBinaryString.hh"
#include "../utils/StaticBuffer.hh"
#include "../utils/StringUtils.hh"
#include "../../deps/StringMatchLen.h"
#include "RaftState.hh"
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/filter_policy.h>
......@@ -540,6 +541,40 @@ rocksdb::Status RaftJournal::checkpoint(const std::string &path) {
return st;
}
//------------------------------------------------------------------------------
// Scan through the contents of the journal, starting from the given index
//------------------------------------------------------------------------------
rocksdb::Status RaftJournal::scanContents(LogIndex startingPoint, size_t count, std::string_view match, std::vector<RaftEntry> &out, LogIndex &nextCursor) {
out.clear();
RaftJournal::Iterator iter = getIterator(startingPoint);
for(size_t i = 0; i < count; i++) {
if(!iter.valid()) {
break;
}
RaftSerializedEntry item;
iter.current(item);
if(match.empty() || stringmatchlen(match.data(), match.length(), item.data(), item.length(), 0) == 1) {
RaftEntry entry;
RaftEntry::deserialize(entry, item);
out.emplace_back(entry);
}
iter.next();
}
if(!iter.valid()) {
nextCursor = 0;
}
else {
nextCursor = iter.getCurrentIndex();
}
return rocksdb::Status::OK();
}
//------------------------------------------------------------------------------
// Iterator
//------------------------------------------------------------------------------
......@@ -597,3 +632,7 @@ void RaftJournal::Iterator::current(RaftSerializedEntry &entry) {
qdb_assert(this->valid());
entry = iter->value().ToString();
}
LogIndex RaftJournal::Iterator::getCurrentIndex() const {
return currentIndex;
}
......@@ -96,6 +96,7 @@ public:
bool valid();
void next();
void current(RaftSerializedEntry &entry);
LogIndex getCurrentIndex() const;
private:
void validate();
LogIndex currentIndex;
......@@ -103,6 +104,7 @@ public:
};
Iterator getIterator(LogIndex startingPoint);
rocksdb::Status scanContents(LogIndex startingPoint, size_t count, std::string_view match, std::vector<RaftEntry> &out, LogIndex &nextCursor);
private:
void openDB(const std::string &path);
......
......@@ -2153,6 +2153,36 @@ TEST_F(Raft_e2e, vhset) {
"2) 1) \"f9\"\n"
" 2) \"v9\"\n"
);
}
TEST_F(Raft_e2e, JournalScanning) {
for(size_t i = 1; i <= 5; i ++) {
RaftEntry entry(0, {"set", SSTR("k" << i), SSTR("v" << i) } );
ASSERT_TRUE(journal(0)->append(i, entry));
ASSERT_TRUE(journal(1)->append(i, entry));
ASSERT_TRUE(journal(2)->append(i, entry));
}
std::vector<RaftEntry> entries;
LogIndex cursor;
ASSERT_OK(journal(0)->scanContents(1, 3, "", entries, cursor));
ASSERT_EQ(entries.size(), 3u);
ASSERT_EQ(cursor, 4);
for(size_t i = 1; i <= 3; i ++) {
RaftEntry entry(0, {"set", SSTR("k" << i), SSTR("v" << i) } );
ASSERT_EQ(entries[i-1], entry);
}
ASSERT_OK(journal(0)->scanContents(0, 300, "*k2*", entries, cursor));
ASSERT_EQ(entries.size(), 1u);
ASSERT_EQ(cursor, 0);
RaftEntry entry(0, {"set", "k2", "v2"});
ASSERT_EQ(entries[0], entry);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment