Commit bf25ef43 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement scan command for locality hashes

parent cf1a0049
Pipeline #527993 passed with stages
in 29 minutes and 12 seconds
......@@ -61,6 +61,7 @@ struct cmdMapInit {
redis_cmd_map["config_getall"] = {RedisCommand::CONFIG_GETALL, CommandType::READ};
redis_cmd_map["lhget"] = {RedisCommand::LHGET, CommandType::READ};
redis_cmd_map["lhlen"] = {RedisCommand::LHLEN, CommandType::READ};
redis_cmd_map["lhscan"] = {RedisCommand::LHSCAN, CommandType::READ};
redis_cmd_map["lhget_with_fallback"] = {RedisCommand::LHGET_WITH_FALLBACK, CommandType::READ};
redis_cmd_map["raw_scan"] = {RedisCommand::RAW_SCAN, CommandType::READ};
redis_cmd_map["raw_get_all_versions"] = {RedisCommand::RAW_GET_ALL_VERSIONS, CommandType::READ};
......
......@@ -70,6 +70,7 @@ enum class RedisCommand {
LHGET,
LHLEN,
LHDEL,
LHSCAN,
LHGET_WITH_FALLBACK,
LHDEL_WITH_FALLBACK,
......
......@@ -658,6 +658,28 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
return errArgs(request);
}
case RedisCommand::LHSCAN: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end());
if(!args.error.empty()) {
return Formatter::err(args.error);
}
// No support for MATCH here, maybe add later
if(!args.match.empty()) {
return Formatter::err("syntax error");
}
std::string newcursor;
std::vector<std::string> vec;
rocksdb::Status st = store.lhscan(stagingArea, request[1], args.cursor, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::LHGET_WITH_FALLBACK: {
// First, try LHGET...
RedisEncodedResponse resp;
......
......@@ -30,6 +30,7 @@
#include "storage/KeyDescriptorBuilder.hh"
#include "storage/PatternMatching.hh"
#include "storage/ExpirationEventIterator.hh"
#include "storage/ReverseLocator.hh"
#include "utils/IntToBinaryString.hh"
#include "utils/TimeFormatting.hh"
#include <sys/stat.h>
......@@ -279,6 +280,8 @@ static rocksdb::Status wrong_type() {
return rocksdb::Status::InvalidArgument("WRONGTYPE Operation against a key holding the wrong kind of value");
}
static KeyDescriptor constructDescriptor(rocksdb::Status &st, const std::string &serialization) {
if(st.IsNotFound()) {
return KeyDescriptor();
......@@ -301,7 +304,7 @@ KeyDescriptor StateMachine::lockKeyDescriptor(StagingArea &stagingArea, Descript
return constructDescriptor(st, tmp);
}
bool StateMachine::assertKeyType(StagingArea &stagingArea, const std::string &key, KeyType keytype) {
bool StateMachine::assertKeyType(StagingArea &stagingArea, std::string_view key, KeyType keytype) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(!keyinfo.empty() && keyinfo.getKeyType() != keytype) return false;
return true;
......@@ -641,6 +644,66 @@ rocksdb::Status StateMachine::hscan(StagingArea &stagingArea, const std::string
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector<std::string> &results) {
if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type();
std::string_view cursorHint;
std::string_view cursorField;
LocalityFieldLocator locator(key);
results.clear();
// Any rocksdb keys we touch must have this prefix.
std::string requiredPrefix(locator.toView());
if(!cursor.empty()) {
// Decompose cursor into hint + field.
EscapedPrefixExtractor extractor;
if(!extractor.parse(cursor)) {
return malformed("invalid cursor");
}
cursorHint = extractor.getOriginalPrefix();
cursorField = extractor.getRawSuffix();
// We start from the given hint + field.
locator.resetHint(cursorHint);
locator.resetField(cursorField);
}
newCursor = "";
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.toSlice()); iter->Valid(); iter->Next()) {
std::string_view rocksdbKey = toView(iter->key());
if(!StringUtils::startsWith(rocksdbKey, requiredPrefix)) {
// It's over, we've iterated through the entire locality hash
break;
}
// Split hint + field
std::string_view hintPlusField = rocksdbKey;
hintPlusField.remove_prefix(requiredPrefix.size());
EscapedPrefixExtractor splitter;
qdb_assert(splitter.parse(hintPlusField));
if(results.size() >= count*3) {
// We've hit result sizelimit, calculate new cursor and break
newCursor = hintPlusField;
break;
}
// Populate new entry consisting of three items
results.emplace_back(splitter.getOriginalPrefix());
results.emplace_back(splitter.getRawSuffix());
results.emplace_back(toView(iter->value()));
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::sscan(StagingArea &stagingArea, const std::string &key, const std::string &cursor, size_t count, std::string &newCursor, std::vector<std::string> &res) {
if(!assertKeyType(stagingArea, key, KeyType::kSet)) return wrong_type();
......
......@@ -115,6 +115,7 @@ public:
rocksdb::Status dequeLen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status lhscan(StagingArea &stagingArae, std::string_view key, std::string_view cursor, size_t count, std::string &newcursor, std::vector<std::string> &results);
rocksdb::Status rawScan(StagingArea &stagingArea, const std::string &key, size_t count, std::vector<std::string> &elements);
rocksdb::Status dequeScanBack(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector<std::string> &results);
void getClock(StagingArea &stagingArea, ClockValue &value);
......@@ -234,7 +235,7 @@ private:
};
void commitTransaction(rocksdb::WriteBatchWithIndex &wb, LogIndex index);
bool assertKeyType(StagingArea &stagingArea, const std::string &key, KeyType keytype);
bool assertKeyType(StagingArea &stagingArea, std::string_view key, KeyType keytype);
rocksdb::Status dequePop(StagingArea &stagingArea, Direction direction, const std::string &key, std::string &item);
rocksdb::Status dequePush(StagingArea &stagingArea, Direction direction, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &length);
......
......@@ -148,11 +148,11 @@ public:
}
}
KeyType getKeyType() {
KeyType getKeyType() const {
return keyType;
}
std::string_view getOriginalKey() {
std::string_view getOriginalKey() const {
qdb_assert(keyType != KeyType::kParseError);
if(keyType == KeyType::kString) {
......@@ -162,20 +162,20 @@ public:
return firstChunk.getOriginalPrefix();
}
std::string_view getField() {
std::string_view getField() const {
qdb_assert(keyType != KeyType::kParseError && keyType != KeyType::kString);
return firstChunk.getRawSuffix();
}
std::string_view getRawPrefixUntilBoundary() {
std::string_view getRawPrefixUntilBoundary() const {
qdb_assert(keyType != KeyType::kParseError && keyType != KeyType::kString);
return std::string_view(slice.data(), firstChunk.getBoundary()+1);
}
bool isLocalityIndex() {
bool isLocalityIndex() const {
if(keyType != KeyType::kLocalityHash) return false;
std::string_view field = getField();
std::string_view field = firstChunk.getRawSuffix();
qdb_assert(field.size() != 0u);
return *(field.data()) == char(InternalLocalityFieldType::kIndex);
}
......
......@@ -1458,6 +1458,41 @@ TEST_F(Raft_e2e, LocalityHash) {
);
qdb_info(qclient::describeRedisReply(reply));
std::vector<std::future<redisReplyPtr>> replies;
replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0" ));
replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "COUNT", "2" ));
replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "next:hint3##f1", "COUNT", "2" ));
ASSERT_REPLY_DESCRIBE(replies[0],
"1) \"0\"\n"
"2) 1) \"hint1\"\n"
" 2) \"f3\"\n"
" 3) \"v6\"\n"
" 4) \"hint2\"\n"
" 5) \"f2\"\n"
" 6) \"v5\"\n"
" 7) \"hint3\"\n"
" 8) \"f1\"\n"
" 9) \"v3\"\n"
);
ASSERT_REPLY_DESCRIBE(replies[1],
"1) \"next:hint3##f1\"\n"
"2) 1) \"hint1\"\n"
" 2) \"f3\"\n"
" 3) \"v6\"\n"
" 4) \"hint2\"\n"
" 5) \"f2\"\n"
" 6) \"v5\"\n"
);
ASSERT_REPLY_DESCRIBE(replies[2],
"1) \"0\"\n"
"2) 1) \"hint3\"\n"
" 2) \"f1\"\n"
" 3) \"v3\"\n"
);
}
TEST_F(Raft_e2e, RawGetAllVersions) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment