Commit 2ea201c2 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement multi-set for locality hashes

parent 319ed8d6
Pipeline #385848 passed with stages
in 24 minutes and 8 seconds
......@@ -77,6 +77,7 @@ struct cmdMapInit {
redis_cmd_map["config_set"] = {RedisCommand::CONFIG_SET, CommandType::WRITE};
redis_cmd_map["lhset"] = {RedisCommand::LHSET, CommandType::WRITE};
redis_cmd_map["lhdel"] = {RedisCommand::LHDEL, CommandType::WRITE};
redis_cmd_map["lhmset"] = {RedisCommand::LHMSET, CommandType::WRITE};
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
......
......@@ -60,6 +60,7 @@ enum class RedisCommand {
HSETNX,
HINCRBYFLOAT,
LHMSET,
LHSET,
LHGET,
LHLEN,
......
......@@ -225,6 +225,12 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::LHMSET: {
if(request.size() <= 4 || (request.size()-2) % 3 != 0) return Formatter::errArgs(request[0]);
rocksdb::Status st = store.lhmset(stagingArea, request[1], request.begin()+2, request.end());
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
......
......@@ -296,18 +296,15 @@ rocksdb::Status StateMachine::hgetall(StagingArea &stagingArea, const std::strin
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated) {
void StateMachine::lhsetInternal(WriteOperation &operation, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated) {
fieldcreated = false;
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
if(operation.localityFieldExists(hint, field)) {
// Cool, field exists, we take the fast path. Just update a single value,
// and we are done. No need to update any indexes or key descriptor size,
// as we simply override the old value.
operation.writeLocalityField(hint, field, value);
return operation.finalize(operation.keySize());
return;
}
// Two cases: We've received a different locality hint, or we're creating
......@@ -323,14 +320,39 @@ rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, const std::string
operation.writeLocalityIndex(field, hint);
// No update on key size, we're just rewriting a key.
return operation.finalize(operation.keySize());
return;
}
// New field!
fieldcreated = true;
operation.writeLocalityField(hint, field, value);
operation.writeLocalityIndex(field, hint);
return operation.finalize(operation.keySize() + 1);
return;
}
rocksdb::Status StateMachine::lhmset(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end) {
if((end - start) % 3 != 0) qdb_throw("lhmset: distance between start and end iterators must be a multiple of three");
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
int64_t created = 0;
for(auto it = start; it != end; it += 3) {
bool fieldcreated = false;
lhsetInternal(operation, key, *it, *(it+1), *(it+2), fieldcreated);
created += fieldcreated;
}
return operation.finalize(operation.keySize() + created);
}
rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated) {
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
fieldcreated = false;
lhsetInternal(operation, key, field, hint, value, fieldcreated);
return operation.finalize(operation.keySize() + fieldcreated);
}
rocksdb::Status StateMachine::lhdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed) {
......
......@@ -66,6 +66,7 @@ public:
rocksdb::Status lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated);
rocksdb::Status lhdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
rocksdb::Status lhmset(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end);
rocksdb::Status sadd(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &added);
rocksdb::Status srem(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
......@@ -248,6 +249,7 @@ private:
void ensureCompatibleFormat(bool justCreated);
void ensureBulkloadSanity(bool justCreated);
void remove_all_with_prefix(const rocksdb::Slice &prefix, int64_t &removed, StagingArea &stagingArea);
void lhsetInternal(WriteOperation &operation, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated);
std::atomic<LogIndex> lastApplied;
std::condition_variable lastAppliedCV;
......
......@@ -1041,4 +1041,23 @@ TEST_F(Raft_e2e, LocalityHash) {
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f5"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5", "hint5"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1", "ayy"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b", "c"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v2", "f1", "hint3", "v3"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f2", "hint2", "v5", "f3", "hint1", "v6"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v6");
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment