Commit b7936bcb authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement HCLONE for copying entire hashes

parent 317e8d2b
Pipeline #457058 failed with stages
in 12 minutes and 26 seconds
# Changelog
All notable changes to this project will be documented in this file.
## Unreleased
### Added
- Command `hclone` for creating identical copies of entire hashes.
## 0.2.9 (2018-07-16)
### Added
- Commands `convert-string-to-int`, `convert-int-to-string` to convert between
......
......@@ -75,6 +75,7 @@ struct cmdMapInit {
redis_cmd_map["hincrbyfloat"] = {RedisCommand::HINCRBYFLOAT, CommandType::WRITE};
redis_cmd_map["hincrbymulti"] = {RedisCommand::HINCRBYMULTI, CommandType::WRITE};
redis_cmd_map["hdel"] = {RedisCommand::HDEL, CommandType::WRITE};
redis_cmd_map["hclone"] = {RedisCommand::HCLONE, CommandType::WRITE};
redis_cmd_map["sadd"] = {RedisCommand::SADD, CommandType::WRITE};
redis_cmd_map["srem"] = {RedisCommand::SREM, CommandType::WRITE};
redis_cmd_map["smove"] = {RedisCommand::SMOVE, CommandType::WRITE};
......
......@@ -63,6 +63,7 @@ enum class RedisCommand {
HSCAN,
HSETNX,
HINCRBYFLOAT,
HCLONE,
LHMSET,
LHSET,
......
......@@ -235,6 +235,13 @@ RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, Re
if(request.size() <= 2) return errArgs(request);
return dispatchHDEL(stagingArea, request[1], request.begin()+2, request.end());
}
case RedisCommand::HCLONE: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.hclone(stagingArea, request[1], request[2]);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::SADD: {
if(request.size() <= 2) return errArgs(request);
int64_t count = 0;
......
......@@ -1098,6 +1098,44 @@ rocksdb::Status StateMachine::lease_get(StagingArea &stagingArea, const std::str
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hclone(StagingArea &stagingArea, const std::string &source, const std::string &target) {
WriteOperation operation(stagingArea, target, KeyType::kHash);
if(!operation.valid()) return wrong_type();
if(operation.keyExists()) {
operation.cancel();
return rocksdb::Status::InvalidArgument("ERR target key already exists, will not overwrite");
}
KeyDescriptor sourceKeyInfo = getKeyDescriptor(stagingArea, source);
if(sourceKeyInfo.empty()) {
operation.cancel();
return rocksdb::Status::OK(); // source key is empty, do nothing
}
if(sourceKeyInfo.getKeyType() != KeyType::kHash) {
operation.cancel();
return wrong_type();
}
int64_t newsize = 0;
FieldLocator locator(KeyType::kHash, source);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startswith(tmp, locator.toSlice())) break;
operation.writeField(
std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()),
iter->value().ToString()
);
newsize++;
}
qdb_assert(newsize == sourceKeyInfo.getSize());
return operation.finalize(newsize);
}
void StateMachine::advanceClock(ClockValue newValue, LogIndex index) {
StagingArea stagingArea(*this);
advanceClock(stagingArea, newValue);
......
......@@ -73,6 +73,7 @@ public:
rocksdb::Status hincrby(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &incrby, int64_t &result);
rocksdb::Status hincrbyfloat(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &incrby, double &result);
rocksdb::Status hdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
rocksdb::Status hclone(StagingArea &stagingArea, const std::string &source, const std::string &target);
rocksdb::Status lhset(StagingArea &stagingArea, const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated);
rocksdb::Status lhdel(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed);
......
......@@ -1354,3 +1354,56 @@ TEST_F(Raft_e2e, InconsistentIteratorsTest) {
ASSERT_REPLY(delReply, 1);
}
TEST_F(Raft_e2e, CloneHash) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
std::vector<std::future<redisReplyPtr>> replies;
for(size_t i = 0; i < 10; i++) {
replies.emplace_back(tunnel(leaderID)->exec("HSET", "hash", SSTR("f" << i), SSTR("v" << i)));
}
for(size_t i = 0; i < 10; i++) {
ASSERT_REPLY(replies[i], 1);
}
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "hash2"), "OK");
redisReplyPtr hgetall = tunnel(leaderID)->exec("hgetall", "hash2").get();
ASSERT_EQ(
qclient::describeRedisReply(hgetall),
"1) \"f0\"\n"
"2) \"v0\"\n"
"3) \"f1\"\n"
"4) \"v1\"\n"
"5) \"f2\"\n"
"6) \"v2\"\n"
"7) \"f3\"\n"
"8) \"v3\"\n"
"9) \"f4\"\n"
"10) \"v4\"\n"
"11) \"f5\"\n"
"12) \"v5\"\n"
"13) \"f6\"\n"
"14) \"v6\"\n"
"15) \"f7\"\n"
"16) \"v7\"\n"
"17) \"f8\"\n"
"18) \"v8\"\n"
"19) \"f9\"\n"
"20) \"v9\"\n"
);
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "hash2"), "ERR Invalid argument: ERR target key already exists, will not overwrite");
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "my-set", "s1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "my-set", "hash3"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "my-set"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "not-existing", "hash"), "ERR Invalid argument: ERR target key already exists, will not overwrite");
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "not-existing", "not-existing-2"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "not-existing", "not-existing-2"), 0);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment