Commit 12a4bc68 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement VHLEN, VHDEL

parent 86aa35a1
Pipeline #717898 failed with stages
in 29 minutes and 24 seconds
......@@ -68,6 +68,7 @@ struct cmdMapInit {
redis_cmd_map["clock_get"] = {RedisCommand::CLOCK_GET, CommandType::READ};
redis_cmd_map["type"] = {RedisCommand::TYPE, CommandType::READ};
redis_cmd_map["vhgetall"] = {RedisCommand::VHGETALL, CommandType::READ};
redis_cmd_map["vhlen"] = {RedisCommand::VHLEN, CommandType::READ};
redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE};
redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE};
......@@ -102,6 +103,7 @@ struct cmdMapInit {
redis_cmd_map["timestamped_lease_get"] = {RedisCommand::TIMESTAMPED_LEASE_GET, CommandType::WRITE};
redis_cmd_map["timestamped_lease_release"] = {RedisCommand::TIMESTAMPED_LEASE_RELEASE, CommandType::WRITE};
redis_cmd_map["vhset"] = {RedisCommand::VHSET, CommandType::WRITE};
redis_cmd_map["vhdel"] = {RedisCommand::VHDEL, CommandType::WRITE};
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
......
......@@ -112,6 +112,8 @@ enum class RedisCommand {
VHSET,
VHGETALL,
VHDEL,
VHLEN,
TIMESTAMPED_LEASE_GET,
TIMESTAMPED_LEASE_ACQUIRE,
......
......@@ -422,13 +422,21 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
return Formatter::ok();
}
case RedisCommand::VHSET: {
if(request.size() != 4) return Formatter::errArgs("vhset");
if(request.size() != 4) return errArgs(request);
uint64_t version;
rocksdb::Status st = store.vhset(stagingArea, request[1], request[2], request[3], version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(version);
}
case RedisCommand::VHDEL: {
if(request.size() <= 2) return errArgs(request);
uint64_t version = 0;
rocksdb::Status st = store.vhdel(stagingArea, request[1], request.begin()+2, request.end(), version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(version);
}
case RedisCommand::TX_READWRITE: {
// Unpack transaction and process
Transaction transaction;
......@@ -782,6 +790,13 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
qdb_assert(transaction.deserialize(request));
return dispatchReadOnly(stagingArea, transaction);
}
case RedisCommand::VHLEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.vhlen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
default: {
return dispatchingError(request, 0);
}
......
......@@ -1074,7 +1074,7 @@ rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize, bool for
if(newsize < 0) qdb_throw("invalid newsize: " << newsize);
if(newsize == 0) {
if(newsize == 0 && keyinfo.getKeyType() != KeyType::kVersionedHash) {
stagingArea.del(dlocator.toView());
}
else if(keyinfo.getSize() != newsize || forceUpdate) {
......@@ -1278,6 +1278,44 @@ rocksdb::Status StateMachine::vhset(StagingArea &stagingArea, std::string_view k
return operation.finalize(newsize, true);
}
rocksdb::Status StateMachine::vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version) {
int64_t removed = 0;
WriteOperation operation(stagingArea, key, KeyType::kVersionedHash);
if(!operation.valid()) return wrong_type();
for(ReqIterator it = start; it != end; it++) {
removed += operation.deleteField(*it);
}
// Have we modified this key in the same write batch already?
// If yes:
// - We have already incremented the version, nothing to do. Each transaction
// towards the state machine counts as a single version.
// If not:
// - We need to increment the version by one.
KeyDescriptor &descriptor = operation.descriptor();
version = descriptor.getStartIndex();
if(removed != 0 && !operation.descriptorModifiedAlreadyInWriteBatch()) {
version++;
descriptor.setStartIndex(version);
}
int64_t newsize = operation.keySize() - removed;
return operation.finalize(newsize, true);
}
rocksdb::Status StateMachine::vhlen(StagingArea &stagingArea, std::string_view key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kVersionedHash)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::vhgetall(StagingArea &stagingArea, std::string_view key, std::vector<std::string> &res, uint64_t &version) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
......
......@@ -101,6 +101,7 @@ public:
// versioned hashes
rocksdb::Status vhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, uint64_t &version);
rocksdb::Status vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version);
//----------------------------------------------------------------------------
// API for transactional reads. Can be part of a mixed read-write transaction.
......@@ -143,6 +144,7 @@ public:
// versioned hashes
rocksdb::Status vhgetall(StagingArea &stagingArea, std::string_view key, std::vector<std::string> &res, uint64_t &version);
rocksdb::Status vhlen(StagingArea &stagingArea, std::string_view key, size_t &len);
//----------------------------------------------------------------------------
// Simple API
......
......@@ -1846,4 +1846,57 @@ TEST_F(Raft_e2e, vhset) {
" 7) \"f4\"\n"
" 8) \"v4\"\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f3"), 5);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(),
"1) (integer) 5\n"
"2) 1) \"f1\"\n"
" 2) \"v1\"\n"
" 3) \"f2\"\n"
" 4) \"v2\"\n"
" 5) \"f4\"\n"
" 6) \"v4\"\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 3);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f1"), 6);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(),
"1) (integer) 6\n"
"2) 1) \"f2\"\n"
" 2) \"v2\"\n"
" 3) \"f4\"\n"
" 4) \"v4\"\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f4"), 7);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(),
"1) (integer) 7\n"
"2) 1) \"f2\"\n"
" 2) \"v2\"\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "not-existing"), 7);
ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(),
"1) (integer) 8\n"
"2) (empty list or set)\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8);
ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f3", "v3"), 9);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(),
"1) (integer) 9\n"
"2) 1) \"f3\"\n"
" 2) \"v3\"\n"
);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment