diff --git a/src/Commands.cc b/src/Commands.cc index 1e27b8aa5de94b12aa3fd1f73551d3da6e100465..f918341d069f623fa9d9d80e56adfb00b91f23d3 100644 --- a/src/Commands.cc +++ b/src/Commands.cc @@ -68,6 +68,7 @@ struct cmdMapInit { redis_cmd_map["clock_get"] = {RedisCommand::CLOCK_GET, CommandType::READ}; redis_cmd_map["type"] = {RedisCommand::TYPE, CommandType::READ}; redis_cmd_map["vhgetall"] = {RedisCommand::VHGETALL, CommandType::READ}; + redis_cmd_map["vhlen"] = {RedisCommand::VHLEN, CommandType::READ}; redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE}; redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE}; @@ -102,6 +103,7 @@ struct cmdMapInit { redis_cmd_map["timestamped_lease_get"] = {RedisCommand::TIMESTAMPED_LEASE_GET, CommandType::WRITE}; redis_cmd_map["timestamped_lease_release"] = {RedisCommand::TIMESTAMPED_LEASE_RELEASE, CommandType::WRITE}; redis_cmd_map["vhset"] = {RedisCommand::VHSET, CommandType::WRITE}; + redis_cmd_map["vhdel"] = {RedisCommand::VHDEL, CommandType::WRITE}; redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL}; redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL}; diff --git a/src/Commands.hh b/src/Commands.hh index 86096df87b1bf78570606ce349c6ff5afbbef366..32e1d69f77aa3204039957ccf8ed7462d21d072a 100644 --- a/src/Commands.hh +++ b/src/Commands.hh @@ -112,6 +112,8 @@ enum class RedisCommand { VHSET, VHGETALL, + VHDEL, + VHLEN, TIMESTAMPED_LEASE_GET, TIMESTAMPED_LEASE_ACQUIRE, diff --git a/src/Dispatcher.cc b/src/Dispatcher.cc index baab04992b05ffcd7540fa57a5efd79f732112b2..fd17c37fe52d81264dd6dd182964df77a37d78b2 100644 --- a/src/Dispatcher.cc +++ b/src/Dispatcher.cc @@ -422,13 +422,21 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re return Formatter::ok(); } case RedisCommand::VHSET: { - if(request.size() != 4) return Formatter::errArgs("vhset"); + if(request.size() != 4) return errArgs(request); uint64_t version; rocksdb::Status st = store.vhset(stagingArea, request[1], request[2], request[3], version); if(!st.ok()) return Formatter::fromStatus(st); return Formatter::integer(version); } + case RedisCommand::VHDEL: { + if(request.size() <= 2) return errArgs(request); + + uint64_t version = 0; + rocksdb::Status st = store.vhdel(stagingArea, request[1], request.begin()+2, request.end(), version); + if(!st.ok()) return Formatter::fromStatus(st); + return Formatter::integer(version); + } case RedisCommand::TX_READWRITE: { // Unpack transaction and process Transaction transaction; @@ -782,6 +790,13 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red qdb_assert(transaction.deserialize(request)); return dispatchReadOnly(stagingArea, transaction); } + case RedisCommand::VHLEN: { + if(request.size() != 2) return errArgs(request); + size_t len; + rocksdb::Status st = store.vhlen(stagingArea, request[1], len); + if(!st.ok()) return Formatter::fromStatus(st); + return Formatter::integer(len); + } default: { return dispatchingError(request, 0); } diff --git a/src/StateMachine.cc b/src/StateMachine.cc index 9a553e80919011991c7f7d9f7f631272de3bdb49..33751545c28048f698438b462dc0d39fba873490 100644 --- a/src/StateMachine.cc +++ b/src/StateMachine.cc @@ -1074,7 +1074,7 @@ rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize, bool for if(newsize < 0) qdb_throw("invalid newsize: " << newsize); - if(newsize == 0) { + if(newsize == 0 && keyinfo.getKeyType() != KeyType::kVersionedHash) { stagingArea.del(dlocator.toView()); } else if(keyinfo.getSize() != newsize || forceUpdate) { @@ -1278,6 +1278,44 @@ rocksdb::Status StateMachine::vhset(StagingArea &stagingArea, std::string_view k return operation.finalize(newsize, true); } +rocksdb::Status StateMachine::vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version) { + int64_t removed = 0; + + WriteOperation operation(stagingArea, key, KeyType::kVersionedHash); + if(!operation.valid()) return wrong_type(); + + for(ReqIterator it = start; it != end; it++) { + removed += operation.deleteField(*it); + } + + // Have we modified this key in the same write batch already? + // If yes: + // - We have already incremented the version, nothing to do. Each transaction + // towards the state machine counts as a single version. + // If not: + // - We need to increment the version by one. + KeyDescriptor &descriptor = operation.descriptor(); + version = descriptor.getStartIndex(); + + if(removed != 0 && !operation.descriptorModifiedAlreadyInWriteBatch()) { + version++; + descriptor.setStartIndex(version); + } + + int64_t newsize = operation.keySize() - removed; + return operation.finalize(newsize, true); +} + +rocksdb::Status StateMachine::vhlen(StagingArea &stagingArea, std::string_view key, size_t &len) { + len = 0; + + KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key); + if(isWrongType(keyinfo, KeyType::kVersionedHash)) return wrong_type(); + + len = keyinfo.getSize(); + return rocksdb::Status::OK(); +} + rocksdb::Status StateMachine::vhgetall(StagingArea &stagingArea, std::string_view key, std::vector<std::string> &res, uint64_t &version) { KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key); diff --git a/src/StateMachine.hh b/src/StateMachine.hh index 8efabd8f1b758e8886cf856c1a70dc4e5fa0b728..4f51ebf2ad11dc8f966f3970b26e3abd3a91c785 100644 --- a/src/StateMachine.hh +++ b/src/StateMachine.hh @@ -101,6 +101,7 @@ public: // versioned hashes rocksdb::Status vhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, uint64_t &version); + rocksdb::Status vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version); //---------------------------------------------------------------------------- // API for transactional reads. Can be part of a mixed read-write transaction. @@ -143,6 +144,7 @@ public: // versioned hashes rocksdb::Status vhgetall(StagingArea &stagingArea, std::string_view key, std::vector<std::string> &res, uint64_t &version); + rocksdb::Status vhlen(StagingArea &stagingArea, std::string_view key, size_t &len); //---------------------------------------------------------------------------- // Simple API diff --git a/test/e2e.cc b/test/e2e.cc index 0607edd8cc9c9859fa87da774be2e325fcc4bb52..157bf503ff4aeb56bc9b681bf0ef6f431b394d08 100644 --- a/test/e2e.cc +++ b/test/e2e.cc @@ -1846,4 +1846,57 @@ TEST_F(Raft_e2e, vhset) { " 7) \"f4\"\n" " 8) \"v4\"\n" ); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f3"), 5); + + ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), + "1) (integer) 5\n" + "2) 1) \"f1\"\n" + " 2) \"v1\"\n" + " 3) \"f2\"\n" + " 4) \"v2\"\n" + " 5) \"f4\"\n" + " 6) \"v4\"\n" + ); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 3); + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f1"), 6); + + ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), + "1) (integer) 6\n" + "2) 1) \"f2\"\n" + " 2) \"v2\"\n" + " 3) \"f4\"\n" + " 4) \"v4\"\n" + ); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 2); + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f4"), 7); + + ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), + "1) (integer) 7\n" + "2) 1) \"f2\"\n" + " 2) \"v2\"\n" + ); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1); + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "not-existing"), 7); + ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8); + + ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), + "1) (integer) 8\n" + "2) (empty list or set)\n" + ); + + ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8); + ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f3", "v3"), 9); + + ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), + "1) (integer) 9\n" + "2) 1) \"f3\"\n" + " 2) \"v3\"\n" + ); + }