Commit 6d298097 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement deque-scan-back to read the contents of a deque

parent f9ff7834
Pipeline #487968 passed with stages
in 28 minutes and 40 seconds
Subproject commit b71d76c06850b45a86758e3ac6d1052364ded3b6
Subproject commit 3d2bc258c301baa0cca5a43d1f0c9c41c57a94d1
......@@ -56,6 +56,7 @@ struct cmdMapInit {
redis_cmd_map["scard"] = {RedisCommand::SCARD, CommandType::READ};
redis_cmd_map["sscan"] = {RedisCommand::SSCAN, CommandType::READ};
redis_cmd_map["deque_len"] = {RedisCommand::DEQUE_LEN, CommandType::READ};
redis_cmd_map["deque_scan_back"] = {RedisCommand::DEQUE_SCAN_BACK, CommandType::READ};
redis_cmd_map["config_get"] = {RedisCommand::CONFIG_GET, CommandType::READ};
redis_cmd_map["config_getall"] = {RedisCommand::CONFIG_GETALL, CommandType::READ};
redis_cmd_map["lhget"] = {RedisCommand::LHGET, CommandType::READ};
......
......@@ -90,6 +90,7 @@ enum class RedisCommand {
DEQUE_POP_BACK,
DEQUE_TRIM_FRONT,
DEQUE_LEN,
DEQUE_SCAN_BACK,
RAW_SCAN,
RAW_GET_ALL_VERSIONS,
......
......@@ -602,6 +602,28 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
case RedisCommand::DEQUE_SCAN_BACK: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end());
if(!args.error.empty()) {
return Formatter::err(args.error);
}
// No support for MATCH here, maybe add later
if(!args.match.empty()) {
return Formatter::err("syntax error");
}
std::string newcursor;
std::vector<std::string> vec;
rocksdb::Status st = store.dequeScanBack(stagingArea, request[1], args.cursor, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::CONFIG_GET: {
if(request.size() != 2) return errArgs(request);
......
......@@ -288,7 +288,7 @@ static KeyDescriptor constructDescriptor(rocksdb::Status &st, const std::string
return KeyDescriptor(serialization);
}
KeyDescriptor StateMachine::getKeyDescriptor(StagingArea &stagingArea, const std::string &redisKey) {
KeyDescriptor StateMachine::getKeyDescriptor(StagingArea &stagingArea, std::string_view redisKey) {
std::string tmp;
DescriptorLocator dlocator(redisKey);
rocksdb::Status st = stagingArea.get(dlocator.toSlice(), tmp);
......@@ -666,6 +666,52 @@ rocksdb::Status StateMachine::sscan(StagingArea &stagingArea, const std::string
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::dequeScanBack(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector<std::string> &res) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kDeque)) return wrong_type();
uint64_t cursorMarker;
if(cursor.size() == 0u) {
cursorMarker = keyinfo.getEndIndex();
}
else if(cursor.size() == 8u) {
cursorMarker = binaryStringToUnsignedInt(cursor.data());
if(cursorMarker > keyinfo.getEndIndex()) {
cursorMarker = keyinfo.getEndIndex();
}
}
else {
return malformed("invalid cursor");
}
uint64_t startingMarker = cursorMarker - count;
if(startingMarker <= keyinfo.getStartIndex() + 1) {
newCursor = "0";
startingMarker = keyinfo.getStartIndex() + 1;
}
else {
newCursor = unsignedIntToBinaryString(startingMarker);
}
FieldLocator locator(KeyType::kDeque, key, unsignedIntToBinaryString(startingMarker));
IteratorPtr iter(stagingArea.getIterator());
iter->Seek(locator.toSlice());
for(uint64_t i = startingMarker; i < cursorMarker; i++) {
qdb_assert(iter->Valid());
locator.resetField(unsignedIntToBinaryString(i));
qdb_assert(locator.toView() == toView(iter->key()));
res.emplace_back(toView(iter->value()));
iter->Next();
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hvals(StagingArea &stagingArea, const std::string &key, std::vector<std::string> &vals) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
......@@ -1362,7 +1408,7 @@ void StateMachine::remove_all_with_prefix(const rocksdb::Slice &prefix, int64_t
for(iter->Seek(prefix); iter->Valid(); iter->Next()) {
// iter->key() may get deleted from under our feet, better keep a copy
std::string key = iter->key().ToString();
if(!StringUtils::startsWithSlice(key, prefix)) break;
if(!StringUtils::startsWith(key, toView(prefix))) break;
if(key.size() > 0 && (key[0] == char(InternalKeyType::kInternal) || key[0] == char(InternalKeyType::kConfiguration))) continue;
stagingArea.del(key);
......
......@@ -116,6 +116,7 @@ public:
rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status rawScan(StagingArea &stagingArea, const std::string &key, size_t count, std::vector<std::string> &elements);
rocksdb::Status dequeScanBack(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector<std::string> &results);
void getClock(StagingArea &stagingArea, ClockValue &value);
//----------------------------------------------------------------------------
......@@ -282,7 +283,7 @@ private:
};
friend class WriteOperation;
KeyDescriptor getKeyDescriptor(StagingArea &stagingArea, const std::string &redisKey);
KeyDescriptor getKeyDescriptor(StagingArea &stagingArea, std::string_view redisKey);
KeyDescriptor lockKeyDescriptor(StagingArea &stagingArea, DescriptorLocator &dlocator);
void retrieveLastApplied();
......
......@@ -28,7 +28,17 @@
#include <string>
#include <string_view>
namespace quarkdb { namespace StringUtils {
namespace quarkdb {
inline std::string_view toView(rocksdb::Slice slice) {
return std::string_view(slice.data(), slice.size());
}
inline rocksdb::Slice toSlice(std::string_view sv) {
return rocksdb::Slice(sv.data(), sv.size());
}
namespace StringUtils {
inline size_t countOccurences(std::string_view key, char c) {
size_t ret = 0;
......@@ -42,14 +52,6 @@ inline size_t countOccurences(std::string_view key, char c) {
}
inline std::string_view sliceToView(rocksdb::Slice slice) {
return std::string_view(slice.data(), slice.size());
}
inline rocksdb::Slice viewToSlice(std::string_view sv) {
return rocksdb::Slice(sv.data(), sv.size());
}
inline bool startsWith(std::string_view str, std::string_view prefix) {
if(prefix.size() > str.size()) return false;
......@@ -59,10 +61,6 @@ inline bool startsWith(std::string_view str, std::string_view prefix) {
return true;
}
inline bool startsWithSlice(rocksdb::Slice str, rocksdb::Slice prefix) {
return startsWith(sliceToView(str), sliceToView(prefix));
}
inline bool isPrefix(const std::string &prefix, const char *buff, size_t n) {
if(n < prefix.size()) return false;
......
......@@ -560,6 +560,22 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("deque-push-back", "my-deque", "1", "2", "3", "4", "5", "6", "7", "8", "9" ));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "0", "COUNT", "3"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "0", "COUNT", "3000"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x05", "COUNT", "3"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "3"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "4"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "2"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x00", "COUNT", "2"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x00", "COUNT", "1"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x70\x00\x00\x00\x00\x00\x00\x00", "COUNT", "1"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xff\xff\xff\xff\xff", "COUNT", "1"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xfd\xf3\xff\x1f\x0f", "COUNT", "1"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xfd\xf3\xff\x1f\x0f", "COUNT", "100"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x06", "COUNT", "3"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x08", "COUNT", "3"));
futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x09", "COUNT", "3"));
int i = 0;
ASSERT_REPLY(futures[i++], 4);
......@@ -580,6 +596,91 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
ASSERT_REPLY(futures[i++], "i6");
ASSERT_REPLY(futures[i++], "OK");
ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[i++], 9);
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n"
"2) 1) \"7\"\n"
" 2) \"8\"\n"
" 3) \"9\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) 1) \"1\"\n"
" 2) \"2\"\n"
" 3) \"3\"\n"
" 4) \"4\"\n"
" 5) \"5\"\n"
" 6) \"6\"\n"
" 7) \"7\"\n"
" 8) \"8\"\n"
" 9) \"9\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x02\"\n"
"2) 1) \"4\"\n"
" 2) \"5\"\n"
" 3) \"6\"\n"
);
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) 1) \"1\"\n"
" 2) \"2\"\n"
" 3) \"3\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) 1) \"1\"\n"
" 2) \"2\"\n"
" 3) \"3\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\n"
"2) 1) \"2\"\n"
" 2) \"3\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) 1) \"1\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) 1) \"1\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) (empty list or set)\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) (empty list or set)\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) (empty list or set)\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:0\"\n"
"2) (empty list or set)\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x03\"\n"
"2) 1) \"5\"\n"
" 2) \"6\"\n"
" 3) \"7\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n"
"2) 1) \"7\"\n"
" 2) \"8\"\n"
" 3) \"9\"\n");
ASSERT_REPLY_DESCRIBE(futures[i++],
"1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n"
"2) 1) \"7\"\n"
" 2) \"8\"\n"
" 3) \"9\"\n");
// Now test qclient callbacks, ensure things stay reasonable when we mix them
// with futures.
......
......@@ -27,6 +27,7 @@
#include <gtest/gtest.h>
#include <qclient/QClient.hh>
#define ASSERT_REPLY_DESCRIBE(reply, val) { ASSERT_EQ(getDescription(reply), val); }
#define ASSERT_REPLY(reply, val) { assert_reply(reply, val); if(::testing::Test::HasFatalFailure()) { FAIL(); return; } }
#define ASSERT_ERR(reply, val) { assert_error(reply, val); if(::testing::Test::HasFatalFailure()) { FAIL(); return; } }
#define ASSERT_NIL(reply) { assert_nil(reply); if(::testing::Test::HasFatalFailure()) { FAIL(); return; } }
......@@ -35,6 +36,14 @@ namespace quarkdb {
using redisReplyPtr = qclient::redisReplyPtr;
inline std::string getDescription(const redisReplyPtr &reply) {
return qclient::describeRedisReply(reply);
}
inline std::string getDescription(std::future<redisReplyPtr> &reply) {
return qclient::describeRedisReply(reply.get());
}
inline void assert_nil(const redisReplyPtr &reply) {
ASSERT_NE(reply, nullptr);
ASSERT_EQ(reply->type, REDIS_REPLY_NIL);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment