diff --git a/src/Commands.cc b/src/Commands.cc index fc23ff1cad0a09abfc596187d1cee8081401be7a..fb6362a891f935e6304e5813d015d961b11e50ab 100644 --- a/src/Commands.cc +++ b/src/Commands.cc @@ -54,6 +54,8 @@ struct cmdMapInit { redis_cmd_map["llen"] = {RedisCommand::LLEN, CommandType::READ}; redis_cmd_map["config_get"] = {RedisCommand::CONFIG_GET, CommandType::READ}; redis_cmd_map["config_getall"] = {RedisCommand::CONFIG_GETALL, CommandType::READ}; + redis_cmd_map["lhget"] = {RedisCommand::LHGET, CommandType::READ}; + redis_cmd_map["lhlen"] = {RedisCommand::LHLEN, CommandType::READ}; redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE}; redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE}; @@ -73,6 +75,7 @@ struct cmdMapInit { redis_cmd_map["rpush"] = {RedisCommand::RPUSH, CommandType::WRITE}; redis_cmd_map["rpop"] = {RedisCommand::RPOP, CommandType::WRITE}; redis_cmd_map["config_set"] = {RedisCommand::CONFIG_SET, CommandType::WRITE}; + redis_cmd_map["lhset"] = {RedisCommand::LHSET, CommandType::WRITE}; redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL}; redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL}; diff --git a/src/Commands.hh b/src/Commands.hh index c02d15ecc493295b825fa42c51c6b4b1ff10f93f..9a18a87d4482c4f82a037bbdc3bbcc5b4ce0b30d 100644 --- a/src/Commands.hh +++ b/src/Commands.hh @@ -60,6 +60,10 @@ enum class RedisCommand { HSETNX, HINCRBYFLOAT, + LHSET, + LHGET, + LHLEN, + SADD, SISMEMBER, SREM, diff --git a/src/Dispatcher.cc b/src/Dispatcher.cc index 354964352f4c38258dd85870a093209f0a438d27..f983aef40742b13071a573169b89fdaa6d11b11e 100644 --- a/src/Dispatcher.cc +++ b/src/Dispatcher.cc @@ -209,6 +209,15 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re rocksdb::Status st = store.configSet(stagingArea, request[1], request[2]); return Formatter::fromStatus(st); } + case RedisCommand::LHSET: { + if(request.size() != 5) return errArgs(request); + + bool fieldcreated; + rocksdb::Status st = store.lhset(stagingArea, request[1], request[2], request[3], request[4], fieldcreated); + if(!st.ok()) return Formatter::fromStatus(st); + + return Formatter::integer(fieldcreated); + } default: { qdb_throw("internal dispatching error in RedisDispatcher for " << request); } @@ -389,6 +398,30 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red if(!st.ok()) return Formatter::fromStatus(st); return Formatter::vector(ret); } + case RedisCommand::LHGET: { + std::string value; + rocksdb::Status st; + + if(request.size() == 3) { + // Empty locality hint + st = store.lhget(stagingArea, request[1], request[2], "", value); + } + else if(request.size() != 4) { + return errArgs(request); + } + + st = store.lhget(stagingArea, request[1], request[2], request[3], value); + if(st.IsNotFound()) return Formatter::null(); + else if(!st.ok()) return Formatter::fromStatus(st); + return Formatter::string(value); + } + case RedisCommand::LHLEN: { + if(request.size() != 2) return errArgs(request); + size_t len; + rocksdb::Status st = store.lhlen(stagingArea, request[1], len); + if(!st.ok()) return Formatter::fromStatus(st); + return Formatter::integer(len); + } default: { return dispatchingError(request, 0); } diff --git a/src/StateMachine.cc b/src/StateMachine.cc index 2d5713f2fe62321df964427a14bdc36daec5a6b3..015692b4188ec3e1cfc55eb4114988863468d420 100644 --- a/src/StateMachine.cc +++ b/src/StateMachine.cc @@ -296,6 +296,81 @@ rocksdb::Status StateMachine::hgetall(StagingArea &stagingArea, const std::strin return rocksdb::Status::OK(); } +rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated) { + fieldcreated = false; + + WriteOperation operation(stagingArea, key, KeyType::kLocalityHash); + if(!operation.valid()) return wrong_type(); + + if(operation.localityFieldExists(hint, field)) { + // Cool, field exists, we take the fast path. Just update a single value, + // and we are done. No need to update any indexes or key descriptor size, + // as we simply override the old value. + operation.writeLocalityField(hint, field, value); + return operation.finalize(operation.keySize()); + } + + // Two cases: We've received a different locality hint, or we're creating + // a new field. + + std::string previousHint; + if(operation.getLocalityIndex(field, previousHint)) { + // Changing locality hint. Drop old entry, insert new one. + qdb_assert(operation.deleteLocalityField(previousHint, field)); + + // Update field and index. + operation.writeLocalityField(hint, field, value); + operation.writeLocalityIndex(field, hint); + + // No update on key size, we're just rewriting a key. + return operation.finalize(operation.keySize()); + } + + // New field! + fieldcreated = true; + operation.writeLocalityField(hint, field, value); + operation.writeLocalityIndex(field, hint); + return operation.finalize(operation.keySize() + 1); +} + +rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value) { + if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type(); + + if(!hint.empty()) { + // We were given a hint, whooo. Fast path. + LocalityFieldLocator locator(key, hint, field); + + rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), locator.toSlice(), &value); + ASSERT_OK_OR_NOTFOUND(st); + + if(st.ok()) { + // Done! + return st; + } + + // Hmh. Either the field does not exist, or we were given a wrong locality + // hint. + } + + std::string correctHint; + + LocalityIndexLocator indexLocator(key, field); + rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), indexLocator.toSlice(), &correctHint); + ASSERT_OK_OR_NOTFOUND(st); + + if(st.IsNotFound()) return st; + + if(!hint.empty()) { + // Client is drunk and giving wrong locality hints, warn. + qdb_assert(hint != correctHint); + qdb_warn("Received invalid locality hint (" << hint << " vs " << correctHint << ") for locality hash with key " << key << ", targeting field " << field); + } + + // Fetch correct hint. + LocalityFieldLocator fieldLocator(key, correctHint, field); + THROW_ON_ERROR(db->Get(stagingArea.snapshot->opts(), fieldLocator.toSlice(), &value)); + return rocksdb::Status::OK(); +} rocksdb::Status StateMachine::hset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &value, bool &fieldcreated) { WriteOperation operation(stagingArea, key, KeyType::kHash); @@ -414,6 +489,16 @@ rocksdb::Status StateMachine::hlen(StagingArea &stagingArea, const std::string & return rocksdb::Status::OK(); } +rocksdb::Status StateMachine::lhlen(StagingArea &stagingArea, const std::string &key, size_t &len) { + len = 0; + + KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key); + if(isWrongType(keyinfo, KeyType::kLocalityHash)) return wrong_type(); + + len = keyinfo.getSize(); + return rocksdb::Status::OK(); +} + rocksdb::Status StateMachine::hscan(StagingArea &stagingArea, const std::string &key, const std::string &cursor, size_t count, std::string &newCursor, std::vector<std::string> &res) { if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type(); @@ -664,6 +749,16 @@ bool StateMachine::WriteOperation::getField(const std::string &field, std::strin return st.ok(); } +bool StateMachine::WriteOperation::getLocalityIndex(const std::string &field, std::string &out) { + assertWritable(); + qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash); + + LocalityIndexLocator locator(redisKey, field); + rocksdb::Status st = stagingArea.get(locator.toSlice(), out); + ASSERT_OK_OR_NOTFOUND(st); + return st.ok(); +} + int64_t StateMachine::WriteOperation::keySize() { return keyinfo.getSize(); } @@ -695,6 +790,24 @@ void StateMachine::WriteOperation::writeField(const std::string &field, const st stagingArea.put(locator.toSlice(), value); } +void StateMachine::WriteOperation::writeLocalityField(const std::string &hint, const std::string &field, const std::string &value) { + assertWritable(); + + qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash); + + LocalityFieldLocator locator(redisKey, hint, field); + stagingArea.put(locator.toSlice(), value); +} + +void StateMachine::WriteOperation::writeLocalityIndex(const std::string &field, const std::string &hint) { + assertWritable(); + + qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash); + + LocalityIndexLocator locator(redisKey, field); + stagingArea.put(locator.toSlice(), hint); +} + rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize) { assertWritable(); @@ -721,6 +834,16 @@ bool StateMachine::WriteOperation::fieldExists(const std::string &field) { return st.ok(); } +bool StateMachine::WriteOperation::localityFieldExists(const std::string &hint, const std::string &field) { + assertWritable(); + qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash); + + LocalityFieldLocator locator(redisKey, hint, field); + rocksdb::Status st = stagingArea.exists(locator.toSlice()); + ASSERT_OK_OR_NOTFOUND(st); + return st.ok(); +} + bool StateMachine::WriteOperation::deleteField(const std::string &field) { assertWritable(); @@ -734,6 +857,19 @@ bool StateMachine::WriteOperation::deleteField(const std::string &field) { return st.ok(); } +bool StateMachine::WriteOperation::deleteLocalityField(const std::string &hint, const std::string &field) { + assertWritable(); + qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash); + + std::string tmp; + LocalityFieldLocator locator(redisKey, hint, field); + rocksdb::Status st = stagingArea.get(locator.toSlice(), tmp); + ASSERT_OK_OR_NOTFOUND(st); + + if(st.ok()) stagingArea.del(locator.toSlice()); + return st.ok(); +} + rocksdb::Status StateMachine::set(StagingArea &stagingArea, const std::string& key, const std::string& value) { WriteOperation operation(stagingArea, key, KeyType::kString); if(!operation.valid()) return wrong_type(); diff --git a/src/StateMachine.hh b/src/StateMachine.hh index 5c6bac1a9e7a720b169d7f95e4337995ab9d4ae3..5f51ee0cb830492b9b5ae890c217551125aacadd 100644 --- a/src/StateMachine.hh +++ b/src/StateMachine.hh @@ -64,6 +64,8 @@ public: rocksdb::Status hincrbyfloat(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &incrby, double &result); rocksdb::Status hdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed); + rocksdb::Status lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated); + rocksdb::Status sadd(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &added); rocksdb::Status srem(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed); rocksdb::Status smove(StagingArea &stagingArea, const std::string &source, const std::string &destination, const std::string &element, int64_t &outcome); @@ -92,6 +94,8 @@ public: rocksdb::Status scard(StagingArea &stagingArea, const std::string &key, size_t &count); rocksdb::Status sscan(StagingArea &stagingArea, const std::string &key, const std::string &cursor, size_t count, std::string &newCursor, std::vector<std::string> &res); rocksdb::Status llen(StagingArea &stagingArea, const std::string &key, size_t &len); + rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value); + rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len); //---------------------------------------------------------------------------- // Simple API @@ -196,14 +200,22 @@ private: bool valid(); bool keyExists(); bool getField(const std::string &field, std::string &out); + bool getLocalityIndex(const std::string &field, std::string &out); + int64_t keySize(); void assertWritable(); void write(const std::string &value); void writeField(const std::string &field, const std::string &value); + void writeLocalityField(const std::string &hint, const std::string &field, const std::string &value); + void writeLocalityIndex(const std::string &field, const std::string &hint); + bool fieldExists(const std::string &field); + bool localityFieldExists(const std::string &hint, const std::string &field); + bool deleteField(const std::string &field); + bool deleteLocalityField(const std::string &hint, const std::string &field); rocksdb::Status finalize(int64_t newsize); diff --git a/test/e2e.cc b/test/e2e.cc index 43468269d7c487abcb5d567c343a4c247649de9d..909929d73d0d32a1e0ea0686cce087fee2d989ec 100644 --- a/test/e2e.cc +++ b/test/e2e.cc @@ -942,3 +942,62 @@ TEST_F(Raft_e2e, sscan) { ASSERT_EQ(pair.first, "0"); ASSERT_EQ(pair.second, make_vec()); } + +TEST_F(Raft_e2e, LocalityHash) { + spinup(0); spinup(1); spinup(2); + RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); + + int leaderID = getLeaderID(); + + // Insert new field. + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0); + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v1"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v1"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); + + // Update old field, no changes to locality hint. + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v2"), 0); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); + + // Insert one more field. + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v3"), 1); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v3"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), "v3"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v3"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); + + // Update locality hint of first field. + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint2", "v2"), 0); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); + + // Update value and locality hint of second field. + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint3", "v4"), 0); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); + + // Insert one more field. + ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "aaaaa", "v5"), 1); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v5"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), "v5"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "wrong-hint"), "v5"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3); + + // Re-read everything. + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4"); + + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2"); + ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); +}