// ----------------------------------------------------------------------
// File: e2e.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftDispatcher.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftConfig.hh"
#include "raft/RaftContactDetails.hh"
#include "Poller.hh"
#include "Configuration.hh"
#include "QuarkDBNode.hh"
#include "test-utils.hh"
#include "RedisParser.hh"
#include
#include "test-reply-macros.hh"
#include "qclient/QScanner.hh"
#include "qclient/QSet.hh"
#include "qclient/ConnectionInitiator.hh"
#include "qclient/QHash.hh"
using namespace quarkdb;
#define ASSERT_OK(msg) ASSERT_TRUE(msg.ok())
class Raft_e2e : public TestCluster3NodesFixture {};
class Raft_e2e5 : public TestCluster5NodesFixture {};
TEST_F(Raft_e2e, coup) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_GE(leaderID, 0);
ASSERT_LE(leaderID, 2);
int instigator = (leaderID+1)%3;
for(int i = 1; i < 10; i++) {
RaftTerm term = state(instigator)->getCurrentTerm();
ASSERT_REPLY(tunnel(instigator)->exec("RAFT_ATTEMPT_COUP"), "vive la revolution");
RETRY_ASSERT_TRUE(state(instigator)->getCurrentTerm() > term);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
if(instigator == getLeaderID()) {
qdb_info("Successful coup in " << i << " attempts");
return; // pass test
}
}
ASSERT_TRUE(false) << "Test has failed";
}
TEST_F(Raft_e2e, simultaneous_clients) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getServerID(state(0)->getSnapshot()->leader);
ASSERT_GE(leaderID, 0);
ASSERT_LE(leaderID, 2);
LogIndex lastEntry = journal(leaderID)->getLogSize() - 1;
std::vector> futures;
// send off many requests, pipeline them
futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("ping"));
futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "1234"));
futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("raft-fetch", SSTR(lastEntry+1), "raw"));
ASSERT_REPLY(futures[0], "");
ASSERT_REPLY(futures[1], "PONG");
ASSERT_REPLY(futures[2], "OK");
ASSERT_REPLY(futures[3], "1234");
RaftEntry entry;
ASSERT_TRUE(RaftParser::fetchResponse(futures[4].get().get(), entry));
ASSERT_EQ(entry.term, state(0)->getCurrentTerm());
ASSERT_EQ(entry.request, make_req("set", "asdf", "1234"));
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "3456"));
futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
ASSERT_REPLY(futures[0], "OK");
ASSERT_REPLY(futures[1], "3456");
// make sure the log entry has been propagated to all nodes
for(size_t i = 0; i < 3; i++) {
std::string value;
RETRY_ASSERT_TRUE(stateMachine(i)->get("asdf", value).ok() && value == "3456");
}
ASSERT_REPLY(tunnel(leaderID)->exec("set", "qwerty", "789"), "OK");
futures.clear();
// interwine pipelined requests from three connections
qclient::QClient tunnel2(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions());
qclient::QClient tunnel3(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions());
futures.emplace_back(tunnel2.exec("get", "qwerty"));
futures.emplace_back(tunnel(leaderID)->exec("set", "client2", "val"));
futures.emplace_back(tunnel(leaderID)->exec("get", "client2"));
futures.emplace_back(tunnel(leaderID)->exec("sadd", "myset", "a"));
futures.emplace_back(tunnel2.exec("sadd", "myset", "b"));
futures.emplace_back(tunnel2.exec("sadd", "myset")); // malformed request
futures.emplace_back(tunnel3.exec("set", "client3", "myval"));
futures.emplace_back(tunnel3.exec("get", "client3"));
// not guaranteed that response will be "myval" here, since it's on a different connection
futures.emplace_back(tunnel2.exec("get", "client3"));
ASSERT_REPLY(futures[0], "789");
ASSERT_REPLY(futures[1], "OK");
ASSERT_REPLY(futures[2], "val");
ASSERT_REPLY(futures[3], 1);
ASSERT_REPLY(futures[4], 1);
ASSERT_REPLY(futures[5], "ERR wrong number of arguments for 'sadd' command");
ASSERT_REPLY(futures[6], "OK");
ASSERT_REPLY(futures[7], "myval");
redisReplyPtr reply = futures[8].get();
std::string str = std::string(reply->str, reply->len);
qdb_info("Race-y request: GET client3 ==> " << str);
ASSERT_TRUE(str == "myval" || str == "");
ASSERT_REPLY(tunnel2.exec("scard", "myset"), 2);
// but here we've received an ack - response _must_ be myval
ASSERT_REPLY(tunnel2.exec("get", "client3"), "myval");
RaftInfo info = dispatcher(leaderID)->info();
ASSERT_EQ(info.blockedWrites, 0u);
ASSERT_EQ(info.leader, myself(leaderID));
std::string err;
std::string checkpointPath = SSTR(commonState.testdir << "/checkpoint");
// Before taking a checkpoint, ensure node #0 is caught up
RETRY_ASSERT_TRUE(stateMachine(0)->getLastApplied() == stateMachine(leaderID)->getLastApplied());
ASSERT_TRUE(dispatcher()->checkpoint(checkpointPath, err));
ASSERT_FALSE(dispatcher()->checkpoint(checkpointPath, err)); // exists already
// pretty expensive to open two extra databases, but necessary
StateMachine checkpointSM(SSTR(checkpointPath << "/state-machine"));
std::string tmp;
ASSERT_OK(checkpointSM.get("client3", tmp));
ASSERT_EQ(tmp, "myval");
ASSERT_OK(checkpointSM.get("client2", tmp));
ASSERT_EQ(tmp, "val");
// TODO: verify checkpointSM last applied, once atomic commits are implemented
// ensure the checkpoint journal is identical to the original
RaftJournal checkpointJournal(SSTR(checkpointPath << "/raft-journal"));
ASSERT_EQ(checkpointJournal.getLogSize(), journal()->getLogSize());
for(LogIndex i = 0; i < journal()->getLogSize(); i++) {
RaftEntry entry1, entry2;
ASSERT_OK(checkpointJournal.fetch(i, entry1));
ASSERT_OK(journal()->fetch(i, entry2));
ASSERT_EQ(entry1, entry2);
}
}
TEST_F(Raft_e2e, hscan) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getServerID(state(0)->getSnapshot()->leader);
for(size_t i = 1; i < 10; i++) {
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", SSTR("f" << i), SSTR("v" << i)), 1);
}
redisReplyPtr reply = tunnel(leaderID)->exec("hscan", "hash", "0", "cOUnT", "3").get();
ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "v1", "f2", "v2", "f3", "v3")));
reply = tunnel(leaderID)->exec("hscan", "hash", "0", "asdf", "123").get();
ASSERT_ERR(reply, "ERR syntax error");
reply = tunnel(leaderID)->exec("hscan", "hash", "next:f4", "COUNT", "3").get();
ASSERT_REPLY(reply, std::make_pair("next:f7", make_vec("f4", "v4", "f5", "v5", "f6", "v6")));
reply = tunnel(leaderID)->exec("hscan", "hash", "next:f7", "COUNT", "30").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec("f7", "v7", "f8", "v8", "f9", "v9")));
reply = tunnel(leaderID)->exec("hscan", "hash", "adfaf").get();
ASSERT_ERR(reply, "ERR invalid cursor");
reply = tunnel(leaderID)->exec("hscan", "hash", "next:zz").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec()));
}
TEST_F(Raft_e2e, scan) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
for(size_t i = 1; i < 10; i++) {
ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("f" << i), SSTR("v" << i)), "OK");
}
redisReplyPtr reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f[1-2]").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec("f1", "f2")));
reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f*", "COUNT", "3").get();
ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));
// without MATCH
reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3").get();
ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));
// with "*" MATCH pattern
reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3", "MATCH", "*").get();
ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));
QScanner scanner(*tunnel(leaderID), "f*", 3);
std::vector ret;
ASSERT_TRUE(scanner.next(ret));
ASSERT_EQ(ret, make_vec("f1", "f2", "f3"));
ASSERT_TRUE(scanner.next(ret));
ASSERT_EQ(ret, make_vec("f4", "f5", "f6"));
ASSERT_TRUE(scanner.next(ret));
ASSERT_EQ(ret, make_vec("f7", "f8", "f9"));
ASSERT_FALSE(scanner.next(ret));
}
TEST_F(Raft_e2e, test_qclient_convenience_classes) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// QHash iterator
std::vector> replies;
for(size_t i = 0; i < 9; i++) {
replies.push_back(tunnel(leaderID)->exec("HSET", "myhash", SSTR("f" << i), SSTR("v" << i)));
}
for(size_t i = 0; i < 9; i++) {
ASSERT_REPLY(replies[i], 1);
}
qclient::QHash qhash(*tunnel(leaderID), "myhash");
qclient::QHash::Iterator it = qhash.getIterator(2);
for(size_t i = 0; i < 9; i++) {
ASSERT_TRUE(it.valid());
ASSERT_EQ(it.getKey(), SSTR("f" << i));
ASSERT_EQ(it.getValue(), SSTR("v" << i));
it.next();
}
ASSERT_FALSE(it.valid());
ASSERT_EQ(it.requestsSoFar(), 5u);
// QSet iterator
replies.clear();
for(size_t i = 0; i < 9; i++) {
replies.push_back(tunnel(leaderID)->exec("SADD", "myset", SSTR("item-" << i)));
}
for(size_t i = 0; i < 9; i++) {
ASSERT_REPLY(replies[i], 1);
}
qclient::QSet qset(*tunnel(leaderID), "myset");
for(size_t count = 1; count < 15; count++) {
qclient::QSet::Iterator it = qset.getIterator(count);
for(size_t i = 0; i < 9; i++) {
ASSERT_TRUE(it.valid());
ASSERT_EQ(it.getElement(), SSTR("item-" << i));
it.next();
}
ASSERT_FALSE(it.valid());
ASSERT_EQ(it.requestsSoFar(), (9 / count) + (9%count != 0) );
}
qclient::QSet::Iterator it2 = qset.getIterator(3, "next:item-4");
for(size_t i = 4; i < 9; i++) {
ASSERT_TRUE(it2.valid());
ASSERT_EQ(it2.getElement(), SSTR("item-" << i));
it2.next();
}
ASSERT_FALSE(it2.valid());
ASSERT_EQ(it2.requestsSoFar(), 2u);
}
TEST_F(Raft_e2e, test_many_redis_commands) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getServerID(state(0)->getSnapshot()->leader);
std::vector> futures;
futures.emplace_back(tunnel(leaderID)->exec("SADD", "myset", "a", "b", "c"));
futures.emplace_back(tunnel(leaderID)->exec("SCARD", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("Smembers", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "a", "b"));
futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "b"));
futures.emplace_back(tunnel(leaderID)->exec("scard", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("smembers", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("get", "empty_key"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-acquire", "123"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-get", "123"));
futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-release", "123"));
ASSERT_REPLY(futures[0], 3);
ASSERT_REPLY(futures[1], 3);
ASSERT_REPLY(futures[2], make_vec("a", "b", "c"));
ASSERT_REPLY(futures[3], 2);
ASSERT_REPLY(futures[4], 0);
ASSERT_REPLY(futures[5], 1);
ASSERT_REPLY(futures[6], make_vec("c"));
ASSERT_NIL(futures[7]);
ASSERT_REPLY(futures[8], "ERR unknown command 'timestamped-lease-acquire'" );
ASSERT_REPLY(futures[9], "ERR unknown command 'timestamped-lease-get'" );
ASSERT_REPLY(futures[10], "ERR unknown command 'timestamped-lease-release'" );
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "b", "c"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "c", "d"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "d"));
futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a", "b", "b"));
futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a"));
futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "c"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
futures.emplace_back(tunnel(leaderID)->exec("srem", "myhash", "wrongtype"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash", "myhash", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "a"));
futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "b"));
futures.emplace_back(tunnel(leaderID)->exec("sismember", "myhash", "b"));
futures.emplace_back(tunnel(leaderID)->exec("scard", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("scard", "does-not-exist"));
futures.emplace_back(tunnel(leaderID)->exec("quarkdb_invalid_command"));
futures.emplace_back(tunnel(leaderID)->exec("raft-fetch-last", "7", "raw"));
size_t count = 0;
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], 2);
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 2);
ASSERT_REPLY(futures[count++], 1);
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], "ERR internal dispatching error");
redisReplyPtr entries = futures[count++].get();
std::vector lastEntries;
ASSERT_TRUE(RaftParser::fetchLastResponse(entries, lastEntries));
for(size_t i = 1; i <= 7; i++) {
RaftEntry comparison;
LogIndex index = journal(leaderID)->getLogSize() - i;
ASSERT_OK(journal(leaderID)->fetch(index, comparison));
ASSERT_EQ(lastEntries[7 - i], comparison);
}
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("set", "mystring", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("keys", "*"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset", "mystring"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset"));
futures.emplace_back(tunnel(leaderID)->exec("clock-get"));
ASSERT_REPLY(futures[0], "OK");
ASSERT_REPLY(futures[1], make_vec("myhash", "myset", "mystring"));
ASSERT_REPLY(futures[2], 4);
ASSERT_REPLY(futures[3], 3);
ASSERT_REPLY(futures[4], 0);
ASSERT_REPLY(futures[5], 0);
qdb_info(qclient::describeRedisReply(futures[6].get()));
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("set", "a", "aa"));
futures.emplace_back(tunnel(leaderID)->exec("set", "aa", "a"));
futures.emplace_back(tunnel(leaderID)->exec("get", "a"));
futures.emplace_back(tunnel(leaderID)->exec("del", "a"));
futures.emplace_back(tunnel(leaderID)->exec("get", "aa"));
futures.emplace_back(tunnel(leaderID)->exec("keys", "*"));
ASSERT_REPLY(futures[0], "OK");
ASSERT_REPLY(futures[1], "OK");
ASSERT_REPLY(futures[2], "aa");
ASSERT_REPLY(futures[3], 1);
ASSERT_REPLY(futures[4], "a");
ASSERT_REPLY(futures[5], make_vec("aa"));
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("config_getall"));
futures.emplace_back(tunnel(leaderID)->exec("config_set", "some.config.value", "1234"));
futures.emplace_back(tunnel(leaderID)->exec("flushall"));
futures.emplace_back(tunnel(leaderID)->exec("del", "aa"));
futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value", "1234"));
futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value"));
futures.emplace_back(tunnel(leaderID)->exec("config_getall"));
ASSERT_REPLY(futures[0], "");
ASSERT_REPLY(futures[1], "OK");
ASSERT_REPLY(futures[2], "OK");
ASSERT_REPLY(futures[3], 0);
ASSERT_REPLY(futures[4], "ERR wrong number of arguments for 'config_get' command");
ASSERT_REPLY(futures[5], "1234");
ASSERT_REPLY(futures[6], make_vec("some.config.value", "1234"));
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("hset", "hash", "key1", "v1"));
futures.emplace_back(tunnel(leaderID)->exec("hset", "hash2", "key1", "v1"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "hash", "hash2"));
futures.emplace_back(tunnel(leaderID)->exec("del", "hash"));
futures.emplace_back(tunnel(leaderID)->exec("raft_info"));
futures.emplace_back(tunnel(leaderID)->exec("bad_command"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "hash"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "hash2"));
futures.emplace_back(tunnel(leaderID)->exec("raft_info", "leader"));
futures.emplace_back(tunnel(leaderID)->exec("recovery_get", "test"));
ASSERT_REPLY(futures[0], 1);
ASSERT_REPLY(futures[1], 1);
ASSERT_REPLY(futures[2], 2);
ASSERT_REPLY(futures[3], 1);
// ignore futures[4]
ASSERT_REPLY(futures[5], "ERR unknown command 'bad_command'");
ASSERT_REPLY(futures[6], 0);
ASSERT_REPLY(futures[7], 1);
ASSERT_REPLY(futures[8], myself(leaderID).toString() );
ASSERT_REPLY(futures[9], "ERR recovery commands not allowed, not in recovery mode");
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f1", "v1", "f2", "v2"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "hmset_test"));
futures.emplace_back(tunnel(leaderID)->exec("hmset", "test"));
futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "v3", "f4"));
futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f1"));
futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));
futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "value2", "f3", "value3"));
futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));
futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f2"));
futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f3", "v3"));
futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f3"));
futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));
ASSERT_REPLY(futures[0], "OK");
ASSERT_REPLY(futures[1], 1);
ASSERT_REPLY(futures[2], "ERR wrong number of arguments for 'hmset' command");
ASSERT_REPLY(futures[3], "ERR wrong number of arguments for 'hmset' command");
ASSERT_REPLY(futures[4], "v1");
ASSERT_REPLY(futures[5], 2);
ASSERT_REPLY(futures[6], "OK");
ASSERT_REPLY(futures[7], 3);
ASSERT_REPLY(futures[8], "value2");
ASSERT_REPLY(futures[9], "OK");
ASSERT_REPLY(futures[10], "v3");
ASSERT_REPLY(futures[11], 3);
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("lpush", "list_test", "i1", "i2", "i3", "i4"));
futures.emplace_back(tunnel(leaderID)->exec("exists", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("del", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("rpush", "list_test", "i5", "i6", "i7", "i8"));
futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf"));
futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
int i = 0;
ASSERT_REPLY(futures[i++], 4);
ASSERT_REPLY(futures[i++], 1);
ASSERT_REPLY(futures[i++], 4);
ASSERT_REPLY(futures[i++], "i4");
ASSERT_REPLY(futures[i++], 3);
ASSERT_REPLY(futures[i++], "i1");
ASSERT_REPLY(futures[i++], 2);
ASSERT_REPLY(futures[i++], 1);
ASSERT_REPLY(futures[i++], 0);
ASSERT_NIL(futures[i++]);
ASSERT_REPLY(futures[i++], 4);
ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[i++], "i5");
ASSERT_REPLY(futures[i++], "i8");
ASSERT_REPLY(futures[i++], "i7");
ASSERT_REPLY(futures[i++], "i6");
ASSERT_REPLY(futures[i++], "OK");
ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
// Now test qclient callbacks, ensure things stay reasonable when we mix them
// with futures.
TrivialQCallback c1;
tunnel(leaderID)->execCB(&c1, "set", "qcl-counter", "1");
TrivialQCallback c2;
tunnel(leaderID)->execCB(&c2, "get", "qcl-counter");
std::future fut1 = tunnel(leaderID)->exec("get", "qcl-counter");
std::future fut2 = tunnel(leaderID)->exec("set", "qcl-counter", "2");
std::future fut3 = tunnel(leaderID)->exec("get", "qcl-counter");
TrivialQCallback c3;
tunnel(leaderID)->execCB(&c3, "get", "qcl-counter");
TrivialQCallback c4;
tunnel(leaderID)->execCB(&c4, "set", "qcl-counter", "3");
TrivialQCallback c5;
tunnel(leaderID)->execCB(&c5, "get", "qcl-counter");
std::future fut4 = tunnel(leaderID)->exec("get", "qcl-counter");
ASSERT_REPLY(c1.getFuture(), "OK");
ASSERT_REPLY(c2.getFuture(), "1");
ASSERT_REPLY(fut1, "1");
ASSERT_REPLY(fut2, "OK");
ASSERT_REPLY(fut3, "2");
ASSERT_REPLY(c3.getFuture(), "2");
ASSERT_REPLY(c4.getFuture(), "OK");
ASSERT_REPLY(c5.getFuture(), "3");
ASSERT_REPLY(fut4, "3");
// Test lease commands.
std::future l0 = tunnel(leaderID)->exec("lease-acquire", "qcl-counter", "holder1", "10000");
std::future l1 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000");
std::future l2 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future l3 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000");
ASSERT_REPLY(l0, "ERR Invalid Argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(l1, "ACQUIRED");
redisReplyPtr replyL2 = l2.get();
std::string reply = std::string(replyL2->str, replyL2->len);
ASSERT_TRUE(StringUtils::startswith(reply, "ERR lease held by 'holder1', time remaining"));
ASSERT_REPLY(l3, "RENEWED");
std::future l4 = tunnel(leaderID)->exec("lease-get", "mykey");
std::future l5 = tunnel(leaderID)->exec("lease-get", "mykey-2");
redisReplyPtr replyL4 = l4.get();
qdb_info(qclient::describeRedisReply(replyL4));
ASSERT_TRUE(StringUtils::startswith(qclient::describeRedisReply(replyL4), "1) HOLDER: holder1\n2) REMAINING: "));
ASSERT_NIL(l5);
std::future l6 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future l7 = tunnel(leaderID)->exec("lease-release", "mykey-2");
std::future l8 = tunnel(leaderID)->exec("lease-release", "qcl-counter");
std::future l9 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future l10 = tunnel(leaderID)->exec("lease-get", "mykey");
ASSERT_REPLY(l6, "OK");
ASSERT_NIL(l7);
ASSERT_REPLY(l8, "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_NIL(l9);
ASSERT_NIL(l10);
std::future l11 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future l12 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
std::future l13 = tunnel(leaderID)->exec("lease-release", "mykey");
std::future l14 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000");
ASSERT_REPLY(l11, "ACQUIRED");
ASSERT_REPLY(l12, "RENEWED");
ASSERT_REPLY(l13, "OK");
ASSERT_REPLY(l14, "ACQUIRED");
// Ensure the followers return the correct number of responses on MOVED for
// pipelined writes.
int follower1 = (leaderID+1) % 3;
std::vector> moved;
for(size_t i = 0; i < 10; i++) {
moved.emplace_back(tunnel(follower1)->exec("set", "abc", "123"));
}
for(size_t i = 0; i < 10; i++) {
ASSERT_REPLY(moved[i], SSTR("MOVED 0 " << myself(leaderID).toString()));
}
// Make sure the connection did not hang.
ASSERT_REPLY(tunnel(follower1)->exec("ping", "zxcvbnm"), "zxcvbnm");
}
TEST_F(Raft_e2e, replication_with_trimmed_journal) {
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getServerID(state(0)->getSnapshot()->leader);
int firstSlaveID = (leaderID+1)%2;
ASSERT_GE(leaderID, 0);
ASSERT_LE(leaderID, 1);
// First, disable automatic resilvering..
EncodedConfigChange configChange = raftconfig(leaderID)->setResilveringEnabled(false);
ASSERT_TRUE(configChange.error.empty());
ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK");
// send off many requests, pipeline them
std::vector> futures;
for(size_t i = 0; i < testreqs.size(); i++) {
futures.emplace_back(tunnel(leaderID)->execute(testreqs[i]));
}
for(size_t i = 0; i < 2; i++) {
ASSERT_REPLY(futures[i], "OK");
}
for(size_t i = 2; i < futures.size(); i++) {
ASSERT_REPLY(futures[i], 1);
}
// now let's trim leader's journal..
journal(leaderID)->trimUntil(4);
// and verify it's NOT possible to bring node #2 up to date
spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
ASSERT_EQ(journal(2)->getLogSize(), 1);
ASSERT_EQ(journal(2)->getLogStart(), 0);
// a divine intervention fills up the missing entries in node #2 journal
for(size_t i = 1; i < 5; i++) {
RaftEntry entry;
ASSERT_TRUE(journal(firstSlaveID)->fetch(i, entry).ok());
journal(2)->append(i, entry);
}
// now verify node #2 can be brought up to date successfully
RETRY_ASSERT_TRUE(
journal(0)->getLogSize() == journal(1)->getLogSize() &&
journal(1)->getLogSize() == journal(2)->getLogSize()
);
ASSERT_EQ(journal(2)->getLogSize(), journal(leaderID)->getLogSize());
ASSERT_EQ(journal(2)->getLogSize(), journal(firstSlaveID)->getLogSize());
// Verify resilvering didn't happen.
ASSERT_EQ(journal(2)->getLogStart(), 0);
}
TEST_F(Raft_e2e, membership_updates) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getServerID(state(0)->getSnapshot()->leader);
ASSERT_REPLY(tunnel(leaderID)->exec("set", "pi", "3.141516"), "OK");
// throw a node out of the cluster
int victim = (leaderID+1) % 3;
RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2));
int index = journal(leaderID)->getLogSize() - 1;
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
RETRY_ASSERT_TRUE(dispatcher(leaderID)->info().commitIndex == index + 1);
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
// add it back as an observer, verify consensus
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(victim).toString()), "OK");
RETRY_ASSERT_TRUE(dispatcher(0)->info().commitIndex == index + 2);
RETRY_ASSERT_TRUE(dispatcher(1)->info().commitIndex == index + 2);
RETRY_ASSERT_TRUE(dispatcher(2)->info().commitIndex == index + 2);
ASSERT_EQ(state(victim)->getSnapshot()->status, RaftStatus::FOLLOWER);
ASSERT_EQ(state(0)->getSnapshot()->leader, state(1)->getSnapshot()->leader);
ASSERT_EQ(state(1)->getSnapshot()->leader, state(2)->getSnapshot()->leader);
ASSERT_EQ(journal(0)->getLogSize(), journal(1)->getLogSize());
ASSERT_EQ(journal(1)->getLogSize(), journal(2)->getLogSize());
// cannot be a leader, it's an observer
ASSERT_NE(state(0)->getSnapshot()->leader, myself(victim));
// add back as a full voting member
leaderID = getServerID(state(0)->getSnapshot()->leader);
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(victim).toString()), "OK");
RETRY_ASSERT_TRUE(dispatcher(leaderID)->info().commitIndex == index + 3);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
}
TEST_F(Raft_e2e, reject_dangerous_membership_update) {
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkFullConsensus(0, 1));
int leaderID = getLeaderID();
// make sure dangerous node removal is prevented
int victim = (leaderID+1) % 2;
redisReplyPtr reply = tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()).get();
ASSERT_ERR(reply, "ERR membership update blocked, new cluster would not have an up-to-date quorum");
// Try to remove a non-existent node
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", RaftServer("random_host", 123).toString()),
"ERR random_host:123 is neither an observer nor a full node.");
// Make sure we can remove the third node
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK");
RaftMembership membership = journal(leaderID)->getMembership();
RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
// Add it back as observer
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(2).toString()), "OK");
membership = journal(leaderID)->getMembership();
RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
// Remove it again
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK");
membership = journal(leaderID)->getMembership();
RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
}
TEST_F(Raft_e2e5, membership_updates_with_disruptions) {
// let's get this party started
spinup(0); spinup(1); spinup(2); spinup(3);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2, 3));
// throw node #4 out of the cluster
int leaderID = getServerID(state(0)->getSnapshot()->leader);
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(4).toString()), "OK");
// wait until membership update has been committed
RaftMembership membership = journal(leaderID)->getMembership();
ASSERT_GT(membership.epoch, 0u);
ASSERT_EQ(membership.nodes.size(), 4u);
RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
// .. and now spinup node #4 :> Ensure it doesn't disrupt the current leader
spinup(4);
std::this_thread::sleep_for(raftclock()->getTimeouts().getHigh()*2);
ASSERT_EQ(leaderID, getServerID(state(0)->getSnapshot()->leader));
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
// remove one more node
int victim = (leaderID+1) % 5;
if(victim == 4) victim = 2;
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
std::this_thread::sleep_for(raftclock()->getTimeouts().getHigh()*2);
// verify the cluster has not been disrupted
ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
// issue a bunch of writes and reads
ASSERT_REPLY(tunnel(leaderID)->exec("set", "123", "abc"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("get", "123"), "abc");
}
TEST_F(Raft_e2e, leader_steps_down_after_follower_loss) {
// cluster with 2 nodes
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
ASSERT_GE(leaderID, 0);
ASSERT_LE(leaderID, 1);
RaftTerm term = state(leaderID)->getCurrentTerm();
int followerID = (leaderID + 1)%2;
spindown(followerID);
RETRY_ASSERT_TRUE(term < state(leaderID)->getCurrentTerm());
ASSERT_TRUE(state(leaderID)->getSnapshot()->leader.empty());
}
TEST_F(Raft_e2e, stale_reads) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
int follower = (getLeaderID() + 1) % 3;
ASSERT_REPLY(tunnel(leaderID)->exec("set", "abc", "1234"), "OK");
ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), SSTR("MOVED 0 " << myself(leaderID).toString()));
ASSERT_REPLY(tunnel(follower)->exec("activate-stale-reads"), "OK");
redisReplyPtr reply = tunnel(follower)->exec("get", "abc").get();
qdb_info("Race-y read: " << std::string(reply->str, reply->len));
RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2));
ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), "1234");
}
TEST_F(Raft_e2e, monitor) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// Get connection ID
redisReplyPtr connIDReply = tunnel(leaderID)->exec("client-id").get();
std::string connID(connIDReply->str, connIDReply->len);
qdb_info("Connection ID: " << connID);
// We can't use QClient for this, it can't handle the output of MONITOR
qclient::ConnectionInitiator initiator("localhost", myself(leaderID).port);
ASSERT_TRUE(initiator.ok());
Link link(initiator.getFd());
BufferedReader reader(&link);
ASSERT_EQ(link.Send(SSTR("*2\r\n$4\r\nAUTH\r\n$" << contactDetails()->getPassword().size() << "\r\n" << contactDetails()->getPassword() << "\r\n")), 56);
std::string response;
RETRY_ASSERT_TRUE(reader.consume(5, response));
ASSERT_EQ(response, "+OK\r\n");
ASSERT_EQ(link.Send("*1\r\n$7\r\nMONITOR\r\n"), 17);
ASSERT_EQ(link.Send("random string"), 13);
RETRY_ASSERT_TRUE(reader.consume(5, response));
ASSERT_EQ(response, "+OK\r\n");
tunnel(leaderID)->exec("set", "abc", "aaaa" "\xab" "bbb");
response.clear();
std::string expectedReply = SSTR("+ [" << connID << "]: \"set\" \"abc\" \"aaaa\\xABbbb\"\r\n");
RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response));
ASSERT_EQ(response, expectedReply);
tunnel(leaderID)->exec("get", "abc");
response.clear();
expectedReply = SSTR("+ [" << connID << "]: \"get\" \"abc\"\r\n");
RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response));
ASSERT_EQ(response, expectedReply);
}
class PingCallback : qclient::QCallback {
public:
PingCallback(qclient::QClient &q) : qcl(q) {
flag = prom.get_future();
qcl.execCB(this, "PING", SSTR(pingCounter));
}
void finalize(bool result) {
isOk = result;
prom.set_value();
}
virtual void handleResponse(redisReplyPtr &&reply) {
if(!reply) finalize(false);
if(reply->type != REDIS_REPLY_STRING) finalize(false);
if(std::string(reply->str, reply->len) != SSTR(pingCounter)) finalize(false);
qdb_info("Received successful ping response: " << pingCounter);
pingCounter++;
if(pingCounter == 5) return finalize(true);
qcl.execCB(this, "PING", SSTR(pingCounter));
}
bool ok() {
return isOk;
}
void wait() {
flag.get();
}
private:
size_t pingCounter = 0;
std::promise prom;
std::future flag;
bool isOk = true;
qclient::QClient &qcl;
};
TEST_F(Raft_e2e, PingExtravaganza) {
// A most efficient and sophisticated ping machinery.
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
PingCallback pinger(*tunnel(leaderID));
pinger.wait();
ASSERT_TRUE(pinger.ok());
}
TEST_F(Raft_e2e, hincrbymulti) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "3", "h2", "h3", "4"), 7);
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "3");
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "4");
ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8"), "ERR wrong number of arguments for 'hincrbymulti' command");
ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8", "13"), 35);
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "-2");
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "24");
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h4", "h8"), "13");
}
TEST_F(Raft_e2e, smove) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set1", "i1", "i2", "i3", "i4", "i5"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set2", "t1", "t2", "t3", "t4", "t5"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("set", "mykey", "myval"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "mykey", "i1"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(tunnel(leaderID)->exec("smove", "mykey", "set1", "i1"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "i1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6);
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i2", "i3", "i4", "i5"));
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5"));
ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "not-existing"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6);
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set1", "i1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i1", "i2", "i3", "i4", "i5"));
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5"));
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6);
ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "i1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6);
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i2", "i3", "i4", "i5"));
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5"));
ASSERT_REPLY(tunnel(leaderID)->exec("quarkdb-manual-compaction"), "OK");
qdb_info(qclient::describeRedisReply(tunnel(leaderID)->exec("quarkdb-level-stats").get()));
qdb_info(qclient::describeRedisReply(tunnel(leaderID)->exec("quarkdb-compression-stats").get()));
}
TEST_F(Raft_e2e, sscan) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
redisReplyPtr reply = tunnel(leaderID)->exec("sscan", "myset", "0", "asdf", "123").get();
ASSERT_ERR(reply, "ERR syntax error");
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset", "a", "b", "c", "d", "e", "f", "g"), 7);
reply = tunnel(leaderID)->exec("sscan", "myset", "0", "COUNT", "3").get();
ASSERT_REPLY(reply, std::make_pair("next:d", make_vec("a", "b", "c")));
reply = tunnel(leaderID)->exec("sscan", "myset", "next:d", "COUNT", "2").get();
ASSERT_REPLY(reply, std::make_pair("next:f", make_vec("d", "e")));
reply = tunnel(leaderID)->exec("sscan", "myset", "next:f", "COUNT", "2").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec("f", "g")));
reply = tunnel(leaderID)->exec("sscan", "myset", "next:zz").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec()));
reply = tunnel(leaderID)->exec("sscan", "not-existing", "next:zz").get();
ASSERT_REPLY(reply, std::make_pair("0", make_vec()));
QSet qset(*tunnel(leaderID), "myset");
auto pair = qset.sscan("0", 2);
ASSERT_EQ(pair.first, "next:c");
ASSERT_EQ(pair.second, make_vec("a", "b"));
pair = qset.sscan(pair.first, 2);
ASSERT_EQ(pair.first, "next:e");
ASSERT_EQ(pair.second, make_vec("c", "d"));
QSet qset2(*tunnel(leaderID), "not-existing");
pair = qset2.sscan("0", 2);
ASSERT_EQ(pair.first, "0");
ASSERT_EQ(pair.second, make_vec());
}
TEST_F(Raft_e2e, LocalityHash) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// Insert new field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "ayy-lmao", "emptykey"), "v1");
// Update old field, no changes to locality hint.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f1", "hint1", "v2", "fallback"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "ayy-lmao", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
// Insert one more field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v3"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint2", "emptykey"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Update locality hint of first field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint2", "v2"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint2", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Update value and locality hint of second field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f2", "hint3", "v4", "fallback"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint3", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
// Insert one more field.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "aaaaa", "v5"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "wrong-hint"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "emptykey"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "aaaaa", "emptykey"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "wrong-hint"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
// Re-read everything.
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint3", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint2", "emptykey"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2");
// Delete key.
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey", "mykey"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("del", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("del", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "aaaaa", "emptykey"), "");
// Recreate with five fields.
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v2"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "hint3", "v3"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f4", "hint4", "v4"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f5", "hint5", "v5"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 5);
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f2", "hint1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint2", "emptykey"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f2", "hint1"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f1", "f3"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f4"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f4", "emptykey"), "v4");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "emptykey"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f4", "f4", "f4", "f4"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f4"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f4", "emptykey"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("get", "mykey"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f4"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5", "hint5"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "hint5", "emptykey"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f5"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5", "hint5"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "hint5", "emptykey"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1", "ayy"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b", "c"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a"), "ERR wrong number of arguments for 'lhmset' command");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v2", "f1", "hint3", "v3"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f2", "hint2", "v5", "f3", "hint1", "v6"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v6");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v3");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v5");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "emptykey"), "v6");
// Test fallback
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f9", "V"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f8", "Z"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "V");
ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f9", "hint1", "VVV"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "VVV");
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 2);
ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f9", "hint", "ZZZ", "fb"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f9"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f8"), "Z");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "ZZZ");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f9"), "ZZZ");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f9", "fb"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f9", "fb"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "");
ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f8", "fb"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f8"), "");
redisReplyPtr reply = tunnel(leaderID)->exec("raw-scan", "\x01", "count", "2000").get();
ASSERT_EQ(
qclient::describeRedisReply(reply),
"1) \"!mykey\"\n"
"2) \"e\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x03\"\n"
"3) \"__clock\"\n"
"4) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\n"
"5) \"__format\"\n"
"6) \"0\"\n"
"7) \"__in-bulkload\"\n"
"8) \"FALSE\"\n"
"9) \"__last-applied\"\n"
"10) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\"\n"
"11) \"emykey##dhint1##f3\"\n"
"12) \"v6\"\n"
"13) \"emykey##dhint2##f2\"\n"
"14) \"v5\"\n"
"15) \"emykey##dhint3##f1\"\n"
"16) \"v3\"\n"
"17) \"emykey##if1\"\n"
"18) \"hint3\"\n"
"19) \"emykey##if2\"\n"
"20) \"hint2\"\n"
"21) \"emykey##if3\"\n"
"22) \"hint1\"\n"
);
qdb_info(qclient::describeRedisReply(reply));
}
TEST_F(Raft_e2e, RawGetAllVersions) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s2"), 1);
redisReplyPtr reply = tunnel(leaderID)->exec("raw-get-all-versions", "cmyset-for-raw-get##s1").get();
qdb_info(qclient::describeRedisReply(reply));
ASSERT_EQ(reply->elements, 4u);
ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: cmyset-for-raw-get##s1");
ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), "VALUE: 1");
// Ignore sequence number
ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1");
reply = tunnel(leaderID)->exec("raw-get-all-versions", "!myset-for-raw-get").get();
ASSERT_EQ(reply->elements, 8u);
qdb_info(qclient::describeRedisReply(reply));
ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: !myset-for-raw-get");
ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), SSTR("VALUE: c" << intToBinaryString(2)));
ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1");
ASSERT_EQ(std::string(reply->element[4]->str, reply->element[4]->len), "KEY: !myset-for-raw-get");
ASSERT_EQ(std::string(reply->element[5]->str, reply->element[5]->len), SSTR("VALUE: c" << intToBinaryString(1)));
ASSERT_EQ(std::string(reply->element[7]->str, reply->element[7]->len), "TYPE: 1");
}
TEST_F(Raft_e2e, ConvertHashToLHash) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f1", "v1"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f1", "hint"), "v1");
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "ERR Destination field already exists!");
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "ERR NotFound: ");
ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f2", "v2"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1);
ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f2", "hint"), "v2");
ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0);
ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 2);
}
TEST_F(Raft_e2e, InconsistentIteratorsTest) {
// Try to trigger "inconsistent iterators" condition
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
std::vector> futs;
for(size_t i = 0; i < 100; i++) {
futs.emplace_back(tunnel(leaderID)->exec("hset", "hash", SSTR("f" << i), SSTR("v" << i)));
}
std::future delReply = tunnel(leaderID)->exec("del", "hash");
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(futs[i], 1);
}
ASSERT_REPLY(delReply, 1);
}