Commit b97de1cb authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement get/set/len support for locality hashes

parent 5aa3f27c
Pipeline #384158 passed with stages
in 68 minutes and 17 seconds
......@@ -54,6 +54,8 @@ struct cmdMapInit {
redis_cmd_map["llen"] = {RedisCommand::LLEN, CommandType::READ};
redis_cmd_map["config_get"] = {RedisCommand::CONFIG_GET, CommandType::READ};
redis_cmd_map["config_getall"] = {RedisCommand::CONFIG_GETALL, CommandType::READ};
redis_cmd_map["lhget"] = {RedisCommand::LHGET, CommandType::READ};
redis_cmd_map["lhlen"] = {RedisCommand::LHLEN, CommandType::READ};
redis_cmd_map["flushall"] = {RedisCommand::FLUSHALL, CommandType::WRITE};
redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE};
......@@ -73,6 +75,7 @@ struct cmdMapInit {
redis_cmd_map["rpush"] = {RedisCommand::RPUSH, CommandType::WRITE};
redis_cmd_map["rpop"] = {RedisCommand::RPOP, CommandType::WRITE};
redis_cmd_map["config_set"] = {RedisCommand::CONFIG_SET, CommandType::WRITE};
redis_cmd_map["lhset"] = {RedisCommand::LHSET, CommandType::WRITE};
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
......
......@@ -60,6 +60,10 @@ enum class RedisCommand {
HSETNX,
HINCRBYFLOAT,
LHSET,
LHGET,
LHLEN,
SADD,
SISMEMBER,
SREM,
......
......@@ -209,6 +209,15 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
rocksdb::Status st = store.configSet(stagingArea, request[1], request[2]);
return Formatter::fromStatus(st);
}
case RedisCommand::LHSET: {
if(request.size() != 5) return errArgs(request);
bool fieldcreated;
rocksdb::Status st = store.lhset(stagingArea, request[1], request[2], request[3], request[4], fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
......@@ -389,6 +398,30 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(ret);
}
case RedisCommand::LHGET: {
std::string value;
rocksdb::Status st;
if(request.size() == 3) {
// Empty locality hint
st = store.lhget(stagingArea, request[1], request[2], "", value);
}
else if(request.size() != 4) {
return errArgs(request);
}
st = store.lhget(stagingArea, request[1], request[2], request[3], value);
if(st.IsNotFound()) return Formatter::null();
else if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(value);
}
case RedisCommand::LHLEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.lhlen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
default: {
return dispatchingError(request, 0);
}
......
......@@ -296,6 +296,81 @@ rocksdb::Status StateMachine::hgetall(StagingArea &stagingArea, const std::strin
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated) {
fieldcreated = false;
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
if(operation.localityFieldExists(hint, field)) {
// Cool, field exists, we take the fast path. Just update a single value,
// and we are done. No need to update any indexes or key descriptor size,
// as we simply override the old value.
operation.writeLocalityField(hint, field, value);
return operation.finalize(operation.keySize());
}
// Two cases: We've received a different locality hint, or we're creating
// a new field.
std::string previousHint;
if(operation.getLocalityIndex(field, previousHint)) {
// Changing locality hint. Drop old entry, insert new one.
qdb_assert(operation.deleteLocalityField(previousHint, field));
// Update field and index.
operation.writeLocalityField(hint, field, value);
operation.writeLocalityIndex(field, hint);
// No update on key size, we're just rewriting a key.
return operation.finalize(operation.keySize());
}
// New field!
fieldcreated = true;
operation.writeLocalityField(hint, field, value);
operation.writeLocalityIndex(field, hint);
return operation.finalize(operation.keySize() + 1);
}
rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value) {
if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type();
if(!hint.empty()) {
// We were given a hint, whooo. Fast path.
LocalityFieldLocator locator(key, hint, field);
rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), locator.toSlice(), &value);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
// Done!
return st;
}
// Hmh. Either the field does not exist, or we were given a wrong locality
// hint.
}
std::string correctHint;
LocalityIndexLocator indexLocator(key, field);
rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), indexLocator.toSlice(), &correctHint);
ASSERT_OK_OR_NOTFOUND(st);
if(st.IsNotFound()) return st;
if(!hint.empty()) {
// Client is drunk and giving wrong locality hints, warn.
qdb_assert(hint != correctHint);
qdb_warn("Received invalid locality hint (" << hint << " vs " << correctHint << ") for locality hash with key " << key << ", targeting field " << field);
}
// Fetch correct hint.
LocalityFieldLocator fieldLocator(key, correctHint, field);
THROW_ON_ERROR(db->Get(stagingArea.snapshot->opts(), fieldLocator.toSlice(), &value));
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &value, bool &fieldcreated) {
WriteOperation operation(stagingArea, key, KeyType::kHash);
......@@ -414,6 +489,16 @@ rocksdb::Status StateMachine::hlen(StagingArea &stagingArea, const std::string &
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhlen(StagingArea &stagingArea, const std::string &key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kLocalityHash)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hscan(StagingArea &stagingArea, const std::string &key, const std::string &cursor, size_t count, std::string &newCursor, std::vector<std::string> &res) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
......@@ -664,6 +749,16 @@ bool StateMachine::WriteOperation::getField(const std::string &field, std::strin
return st.ok();
}
bool StateMachine::WriteOperation::getLocalityIndex(const std::string &field, std::string &out) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityIndexLocator locator(redisKey, field);
rocksdb::Status st = stagingArea.get(locator.toSlice(), out);
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
int64_t StateMachine::WriteOperation::keySize() {
return keyinfo.getSize();
}
......@@ -695,6 +790,24 @@ void StateMachine::WriteOperation::writeField(const std::string &field, const st
stagingArea.put(locator.toSlice(), value);
}
void StateMachine::WriteOperation::writeLocalityField(const std::string &hint, const std::string &field, const std::string &value) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityFieldLocator locator(redisKey, hint, field);
stagingArea.put(locator.toSlice(), value);
}
void StateMachine::WriteOperation::writeLocalityIndex(const std::string &field, const std::string &hint) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityIndexLocator locator(redisKey, field);
stagingArea.put(locator.toSlice(), hint);
}
rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize) {
assertWritable();
......@@ -721,6 +834,16 @@ bool StateMachine::WriteOperation::fieldExists(const std::string &field) {
return st.ok();
}
bool StateMachine::WriteOperation::localityFieldExists(const std::string &hint, const std::string &field) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityFieldLocator locator(redisKey, hint, field);
rocksdb::Status st = stagingArea.exists(locator.toSlice());
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::deleteField(const std::string &field) {
assertWritable();
......@@ -734,6 +857,19 @@ bool StateMachine::WriteOperation::deleteField(const std::string &field) {
return st.ok();
}
bool StateMachine::WriteOperation::deleteLocalityField(const std::string &hint, const std::string &field) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
std::string tmp;
LocalityFieldLocator locator(redisKey, hint, field);
rocksdb::Status st = stagingArea.get(locator.toSlice(), tmp);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) stagingArea.del(locator.toSlice());
return st.ok();
}
rocksdb::Status StateMachine::set(StagingArea &stagingArea, const std::string& key, const std::string& value) {
WriteOperation operation(stagingArea, key, KeyType::kString);
if(!operation.valid()) return wrong_type();
......
......@@ -64,6 +64,8 @@ public:
rocksdb::Status hincrbyfloat(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &incrby, double &result);
rocksdb::Status hdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
rocksdb::Status lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated);
rocksdb::Status sadd(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &added);
rocksdb::Status srem(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
rocksdb::Status smove(StagingArea &stagingArea, const std::string &source, const std::string &destination, const std::string &element, int64_t &outcome);
......@@ -92,6 +94,8 @@ public:
rocksdb::Status scard(StagingArea &stagingArea, const std::string &key, size_t &count);
rocksdb::Status sscan(StagingArea &stagingArea, const std::string &key, const std::string &cursor, size_t count, std::string &newCursor, std::vector<std::string> &res);
rocksdb::Status llen(StagingArea &stagingArea, const std::string &key, size_t &len);
rocksdb::Status lhget(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, const std::string &key, size_t &len);
//----------------------------------------------------------------------------
// Simple API
......@@ -196,14 +200,22 @@ private:
bool valid();
bool keyExists();
bool getField(const std::string &field, std::string &out);
bool getLocalityIndex(const std::string &field, std::string &out);
int64_t keySize();
void assertWritable();
void write(const std::string &value);
void writeField(const std::string &field, const std::string &value);
void writeLocalityField(const std::string &hint, const std::string &field, const std::string &value);
void writeLocalityIndex(const std::string &field, const std::string &hint);
bool fieldExists(const std::string &field);
bool localityFieldExists(const std::string &hint, const std::string &field);
bool deleteField(const std::string &field);
bool deleteLocalityField(const std::string &hint, const std::string &field);
rocksdb::Status finalize(int64_t newsize);
......
......@@ -942,3 +942,62 @@ TEST_F(Raft_e2e, sscan) {
ASSERT_EQ(pair.first, "0");
ASSERT_EQ(pair.second, make_vec());
}
TEST_F(Raft_e2e, LocalityHash) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// Insert new field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
// Update old field, no changes to locality hint.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v2"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
// Insert one more field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v3"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Update locality hint of first field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint2", "v2"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Update value and locality hint of second field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint3", "v4"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Insert one more field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "aaaaa", "v5"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "wrong-hint"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
// Re-read everything.
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment