Commit 5baa229c authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Add command to atomically convert hash fields to lhash ones

parent ea68e6d6
Pipeline #390599 passed with stages
in 25 minutes and 29 seconds
......@@ -83,6 +83,7 @@ struct cmdMapInit {
redis_cmd_map["lhmset"] = {RedisCommand::LHMSET, CommandType::WRITE};
redis_cmd_map["lhdel_with_fallback"] = {RedisCommand::LHDEL_WITH_FALLBACK, CommandType::WRITE};
redis_cmd_map["lhset_and_del_fallback"] = {RedisCommand::LHSET_AND_DEL_FALLBACK, CommandType::WRITE};
redis_cmd_map["convert_hash_field_to_lhash"] = {RedisCommand::CONVERT_HASH_FIELD_TO_LHASH, CommandType::WRITE};
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
......
......@@ -69,6 +69,7 @@ enum class RedisCommand {
LHGET_WITH_FALLBACK,
LHDEL_WITH_FALLBACK,
LHSET_AND_DEL_FALLBACK,
CONVERT_HASH_FIELD_TO_LHASH,
SADD,
SISMEMBER,
......
......@@ -257,6 +257,30 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
return resp;
}
case RedisCommand::CONVERT_HASH_FIELD_TO_LHASH: {
if(request.size() != 6) return errArgs(request);
std::string shouldBeEmpty;
rocksdb::Status st = store.lhget(stagingArea, request[3], request[4], request[5], shouldBeEmpty);
if(st.ok()) return Formatter::err("Destination field already exists!");
std::string value;
st = store.hget(stagingArea, request[1], request[2], value);
if(!st.ok()) return Formatter::fromStatus(st);
bool fieldCreated = false;
st = store.lhset(stagingArea, request[3], request[4], request[5], value, fieldCreated);
if(!st.ok()) return Formatter::fromStatus(st);
if(!fieldCreated) qdb_throw("should never happen");
int64_t removed = 0;
st = store.hdel(stagingArea, request[1], request.begin()+2, request.begin()+3, removed);
if(!st.ok()) qdb_throw("should never happen");
qdb_assert(removed == 1);
return Formatter::ok();
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
......
......@@ -257,7 +257,7 @@ rocksdb::Status StateMachine::hget(StagingArea &stagingArea, const std::string &
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
FieldLocator locator(KeyType::kHash, key, field);
return db->Get(stagingArea.snapshot->opts(), locator.toSlice(), &value);
return stagingArea.get(locator.toSlice(), value);
}
rocksdb::Status StateMachine::hexists(StagingArea &stagingArea, const std::string &key, const std::string &field) {
......@@ -381,7 +381,7 @@ rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, const std::string
// We were given a hint, whooo. Fast path.
LocalityFieldLocator locator(key, hint, field);
rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), locator.toSlice(), &value);
rocksdb::Status st = stagingArea.get(locator.toSlice(), value);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
......@@ -396,7 +396,7 @@ rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, const std::string
std::string correctHint;
LocalityIndexLocator indexLocator(key, field);
rocksdb::Status st = db->Get(stagingArea.snapshot->opts(), indexLocator.toSlice(), &correctHint);
rocksdb::Status st = stagingArea.get(indexLocator.toSlice(), correctHint);
ASSERT_OK_OR_NOTFOUND(st);
if(st.IsNotFound()) return st;
......@@ -409,7 +409,7 @@ rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, const std::string
// Fetch correct hint.
LocalityFieldLocator fieldLocator(key, correctHint, field);
THROW_ON_ERROR(db->Get(stagingArea.snapshot->opts(), fieldLocator.toSlice(), &value));
THROW_ON_ERROR(stagingArea.get(fieldLocator.toSlice(), value));
return rocksdb::Status::OK();
}
......
......@@ -1187,3 +1187,29 @@ TEST_F(Raft_e2e, RawGetAllVersions) {
ASSERT_EQ(std::string(reply->element[5]->str, reply->element[5]->len), SSTR("VALUE: c" << intToBinaryString(1)));
ASSERT_EQ(std::string(reply->element[7]->str, reply->element[7]->len), "TYPE: 1");
}
TEST_F(Raft_e2e, ConvertHashToLHash) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f1", "v1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f1", "hint"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "ERR Destination field already exists!");
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "ERR NotFound: ");
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f2", "v2"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f2", "hint"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 2);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment