Commit e996694b authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Update QClient, use new constructor API

parent 818aa8a1
Pipeline #409071 passed with stages
in 29 minutes and 20 seconds
Subproject commit f819f588e4e13957ab243cd2a01e12a3684028a5
Subproject commit d23f16afe5321c3a0ccdc1bf75cd443014b82bcc
......@@ -86,12 +86,20 @@ private:
RaftTalker::RaftTalker(const RaftServer &server_, const RaftContactDetails &contactDetails)
: server(server_), tlsconfig(), tunnel(server.hostname, server.port, false, qclient::RetryStrategy::NoRetries(), qclient::BackpressureStrategy::Default(), tlsconfig, std::unique_ptr<Handshake>(new RaftHandshake(contactDetails)) ) {
: server(server_) {
qclient::Options opts;
opts.transparentRedirects = false;
opts.retryStrategy = qclient::RetryStrategy::NoRetries();
opts.backpressureStrategy = qclient::BackpressureStrategy::Default();
opts.handshake.reset(new RaftHandshake(contactDetails));
qcl.reset(new QClient(server.hostname, server.port, std::move(opts)));
}
RaftTalker::RaftTalker(const RaftServer &server_)
: server(server_), tunnel(server.hostname, server.port) {
: server(server_), qcl(new QClient(server.hostname, server.port, {} )) {
}
std::future<redisReplyPtr> RaftTalker::heartbeat(RaftTerm term, const RaftServer &leader) {
......@@ -101,7 +109,7 @@ std::future<redisReplyPtr> RaftTalker::heartbeat(RaftTerm term, const RaftServer
payload.emplace_back(std::to_string(term));
payload.emplace_back(leader.toString());
return tunnel.execute(payload);
return qcl->execute(payload);
}
std::future<redisReplyPtr> RaftTalker::appendEntries(
......@@ -133,7 +141,7 @@ std::future<redisReplyPtr> RaftTalker::appendEntries(
qdb_assert(RaftEntry::fetchTerm(entries[i]) <= term);
}
return tunnel.execute(payload);
return qcl->execute(payload);
}
std::future<redisReplyPtr> RaftTalker::requestVote(const RaftVoteRequest &req) {
......@@ -145,7 +153,7 @@ std::future<redisReplyPtr> RaftTalker::requestVote(const RaftVoteRequest &req) {
payload.emplace_back(std::to_string(req.lastIndex));
payload.emplace_back(std::to_string(req.lastTerm));
return tunnel.execute(payload);
return qcl->execute(payload);
}
std::future<redisReplyPtr> RaftTalker::fetch(LogIndex index) {
......@@ -154,21 +162,21 @@ std::future<redisReplyPtr> RaftTalker::fetch(LogIndex index) {
payload.emplace_back("RAFT_FETCH");
payload.emplace_back(std::to_string(index));
return tunnel.execute(payload);
return qcl->execute(payload);
}
std::future<redisReplyPtr> RaftTalker::resilveringStart(const ResilveringEventID &id) {
return tunnel.exec("quarkdb_start_resilvering", id);
return qcl->exec("quarkdb_start_resilvering", id);
}
std::future<redisReplyPtr> RaftTalker::resilveringCopy(const ResilveringEventID &id, const std::string &filename, const std::string &contents) {
return tunnel.exec("quarkdb_resilvering_copy_file", id, filename, contents);
return qcl->exec("quarkdb_resilvering_copy_file", id, filename, contents);
}
std::future<redisReplyPtr> RaftTalker::resilveringFinish(const ResilveringEventID &id) {
return tunnel.exec("quarkdb_finish_resilvering", id);
return qcl->exec("quarkdb_finish_resilvering", id);
}
std::future<redisReplyPtr> RaftTalker::resilveringCancel(const ResilveringEventID &id, const std::string &reason) {
return tunnel.exec("quarkdb_cancel_resilvering");
return qcl->exec("quarkdb_cancel_resilvering");
}
......@@ -24,10 +24,14 @@
#ifndef __QUARKDB_RAFT_TALKER_H__
#define __QUARKDB_RAFT_TALKER_H__
#include <qclient/QClient.hh>
#include "qclient/QClient.hh"
#include "RaftCommon.hh"
#include <mutex>
namespace qclient {
class QClient; class Options;
}
namespace quarkdb {
using namespace qclient;
......@@ -54,8 +58,7 @@ public:
RaftServer getServer() { return server; }
private:
RaftServer server;
TlsConfig tlsconfig;
QClient tunnel;
std::unique_ptr<QClient> qcl;
};
}
......
......@@ -109,7 +109,7 @@ public:
}
void mainRedis(int threadId) {
qclient::QClient tunnel(server.hostname, server.port);
qclient::QClient tunnel(server.hostname, server.port, {} );
while(true) {
size_t next = nextEvent++;
if(next > events) break;
......
......@@ -112,8 +112,8 @@ TEST_F(Raft_e2e, simultaneous_clients) {
futures.clear();
// interwine pipelined requests from three connections
qclient::QClient tunnel2(myself(leaderID).hostname, myself(leaderID).port);
qclient::QClient tunnel3(myself(leaderID).hostname, myself(leaderID).port);
qclient::QClient tunnel2(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions());
qclient::QClient tunnel3(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions());
futures.emplace_back(tunnel2.exec("get", "qwerty"));
futures.emplace_back(tunnel(leaderID)->exec("set", "client2", "val"));
......
......@@ -65,7 +65,7 @@ TEST_F(Multi, HandlerBasicSanity) {
ASSERT_REPLY(replies[2], "QUEUED");
// No dirty reads
QClient connection2(myself(leaderID).hostname, myself(leaderID).port);
QClient connection2(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions());
ASSERT_REPLY(connection2.exec("GET", "key"), "");
redisReplyPtr reply = tunnel(leaderID)->exec("EXEC").get();
......
......@@ -39,7 +39,7 @@ TEST_F(tPoller, T1) {
Poller smPoller(myself().port, &dispatcher);
// start first connection
QClient tunnel(myself().hostname, myself().port);
QClient tunnel(myself().hostname, myself().port, makeNoRedirectOptions());
redisReplyPtr reply = tunnel.exec("set", "abc", "1234").get();
ASSERT_REPLY(reply, "OK");
......@@ -48,7 +48,7 @@ TEST_F(tPoller, T1) {
ASSERT_REPLY(reply, "1234");
// start second connection, ensure the poller can handle them concurrently
QClient tunnel2(myself().hostname, myself().port);
QClient tunnel2(myself().hostname, myself().port, makeNoRedirectOptions());
reply = tunnel2.exec("get", "abc").get();
ASSERT_REPLY(reply, "1234");
......@@ -57,7 +57,7 @@ TEST_F(tPoller, T1) {
ASSERT_REPLY(reply, "OK");
// now try a third
QClient tunnel3(myself().hostname, myself().port);
QClient tunnel3(myself().hostname, myself().port, makeNoRedirectOptions());
reply = tunnel3.exec("get", "qwert").get();
ASSERT_REPLY(reply, "asdf");
}
......@@ -65,7 +65,7 @@ TEST_F(tPoller, T1) {
TEST_F(tPoller, test_reconnect) {
RedisDispatcher dispatcher(*stateMachine());
QClient tunnel(myself().hostname, myself().port);
QClient tunnel(myself().hostname, myself().port, makeNoRedirectOptions());
for(size_t reconnects = 0; reconnects < 5; reconnects++) {
Poller rocksdbpoller(myself().port, &dispatcher);
......
......@@ -46,7 +46,7 @@ static std::string str_from_reply(redisReplyPtr &reply) {
}
TEST(Tunnel, T1) {
QClient tunnel("localhost", 1234);
QClient tunnel("localhost", 1234, {} );
RedisRequest req { "set", "abc", "123" };
std::future<redisReplyPtr> fut = tunnel.execute(req);
......@@ -101,7 +101,10 @@ TEST(QClient, T2) {
};
// with handshake
QClient tunnel("localhost", 1234, false, qclient::RetryStrategy::NoRetries(), qclient::BackpressureStrategy::Default(), qclient::TlsConfig(), std::unique_ptr<Handshake>(new SimpleHandshake()));
qclient::Options options;
options.handshake.reset(new SimpleHandshake());
QClient tunnel("localhost", 1234, std::move(options));
RedisRequest req { "set", "abc", "123" };
std::future<redisReplyPtr> fut = tunnel.execute(req);
......@@ -158,8 +161,11 @@ TEST(QClient, T3) {
};
// with handshake
qclient::RetryStrategy strategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60));
QClient tunnel("localhost", 1234, false, strategy, qclient::BackpressureStrategy::Default(), qclient::TlsConfig(), std::unique_ptr<Handshake>(new PingHandshake()));
qclient::Options options;
options.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60));
options.handshake.reset(new PingHandshake());
QClient tunnel("localhost", 1234, std::move(options));
for(size_t attempts = 0; attempts < 2; attempts++) {
......@@ -200,8 +206,10 @@ TEST(QClient, T3) {
TEST(QClient, AuthHandshake) {
// with handshake
qclient::RetryStrategy strategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60));
QClient tunnel("localhost", 1235, false, strategy, qclient::BackpressureStrategy::Default(), qclient::TlsConfig(), std::unique_ptr<Handshake>(new qclient::AuthHandshake("hunter2")));
qclient::Options opts;
opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60));
opts.handshake.reset(new qclient::AuthHandshake("hunter2"));
QClient tunnel("localhost", 1235, std::move(opts));
for(size_t attempts = 0; attempts < 2; attempts++) {
SocketListener listener(1235);
......
......@@ -93,7 +93,7 @@ TEST(Recovery, RemoveJournalEntriesAndChangeClusterID) {
{
RecoveryRunner runner("/tmp/quarkdb-recovery-test", 30100);
qclient::QClient qcl("localhost", 30100);
qclient::QClient qcl("localhost", 30100, {} );
ASSERT_REPLY(qcl.exec("get", KeyConstants::kJournal_ClusterID), "some-cluster-id");
ASSERT_REPLY(qcl.exec("set", KeyConstants::kJournal_ClusterID, "different-cluster-id"), "OK");
......
......@@ -48,7 +48,7 @@ TEST_F(Background_Flusher, basic_sanity) {
qclient::Notifier dummyNotifier;
ASSERT_EQ(system("rm -rf /tmp/quarkdb-tests-flusher"), 0);
qclient::BackgroundFlusher flusher(qclient::Members(myself(follower).hostname, myself(follower).port),
dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher")
qclient::Options(), dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher")
);
const int nentries = 10000;
......@@ -87,7 +87,7 @@ TEST_F(Background_Flusher, with_transition) {
qclient::Notifier dummyNotifier;
ASSERT_EQ(system("rm -rf /tmp/quarkdb-tests-flusher"), 0);
qclient::BackgroundFlusher flusher(members, dummyNotifier,
qclient::BackgroundFlusher flusher(members, qclient::Options(), dummyNotifier,
new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher")
);
......@@ -122,7 +122,7 @@ TEST_F(Background_Flusher, persistency) {
ASSERT_EQ(system("rm -rf /tmp/quarkdb-tests-flusher"), 0);
std::unique_ptr<qclient::BackgroundFlusher> flusher(
new qclient::BackgroundFlusher(qclient::Members(myself(follower).hostname, myself(follower).port), dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher"))
new qclient::BackgroundFlusher(qclient::Members(myself(follower).hostname, myself(follower).port), qclient::Options(), dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher"))
);
// queue entries
......@@ -135,7 +135,7 @@ TEST_F(Background_Flusher, persistency) {
// stop the flusher, recover contents from persistency layer
flusher.reset();
flusher.reset(new qclient::BackgroundFlusher(qclient::Members(myself(follower).hostname, myself(follower).port), dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher")));
flusher.reset(new qclient::BackgroundFlusher(qclient::Members(myself(follower).hostname, myself(follower).port), qclient::Options(), dummyNotifier, new qclient::RocksDBPersistency("/tmp/quarkdb-tests-flusher")));
ASSERT_GT(flusher->size(), 0u);
RETRY_ASSERT_TRUE(flusher->size() == 0u);
......
......@@ -45,8 +45,10 @@ TEST_F(QClientTests, hide_transient_failures) {
members.push_back(myself(1).hostname, myself(1).port);
members.push_back(myself(2).hostname, myself(2).port);
qclient::RetryStrategy retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(30));
QClient qcl(members, true, retryStrategy);
qclient::Options opts;
opts.transparentRedirects = true;
opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(30));
QClient qcl(members, std::move(opts));
// Issue request _before_ spinning up the cluster! Verify it succeeds.
std::future<redisReplyPtr> reply = qcl.exec("HSET", "aaaaa", "bbbbb", "cccc");
......@@ -98,8 +100,10 @@ TEST_F(QClientTests, nullptr_only_after_timeout) {
members.push_back(myself(1).hostname, myself(1).port);
members.push_back(myself(2).hostname, myself(2).port);
qclient::RetryStrategy retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(3));
QClient qcl(members, true, retryStrategy);
qclient::Options opts;
opts.transparentRedirects = true;
opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(3));
QClient qcl(members, std::move(opts));
ASSERT_REPLY(qcl.exec("HSET", "aaaaa", "bbbbb", "cccc"), 1);
ASSERT_REPLY(qcl.exec("HGET", "aaaaa", "bbbbb"), "cccc");
......@@ -171,8 +175,10 @@ TEST_F(QClientTests, MultipleWriterThreads) {
int leaderID = getLeaderID();
// Launch many threads doing pings, using the same QClient object.
QClient qcl(myself(leaderID).hostname, myself(leaderID).port, false,
RetryStrategy::NoRetries(), BackpressureStrategy::RateLimitPendingRequests(2048));
qclient::Options opts;
opts.backpressureStrategy = BackpressureStrategy::RateLimitPendingRequests(2048);
QClient qcl(myself(leaderID).hostname, myself(leaderID).port, std::move(opts));
std::vector<std::thread> threads;
for(size_t i = 0; i < 20; i++) {
......
......@@ -159,6 +159,10 @@ qclient::QClient* TestCluster::tunnel(int id) {
return node(id)->tunnel();
}
qclient::Options TestCluster::makeNoRedirectOptions(int id) {
return node(id)->makeNoRedirectOptions();
}
RaftClock* TestCluster::raftclock(int id) {
return node(id)->group()->raftclock();
}
......@@ -297,9 +301,15 @@ Poller* TestNode::poller() {
return pollerptr;
}
qclient::Options TestNode::makeNoRedirectOptions() {
qclient::Options options;
options.transparentRedirects = false;
return options;
}
qclient::QClient* TestNode::tunnel() {
if(tunnelptr == nullptr) {
tunnelptr = new qclient::QClient(myself().hostname, myself().port);
tunnelptr = new qclient::QClient(myself().hostname, myself().port, makeNoRedirectOptions());
}
return tunnelptr;
}
......
......@@ -142,6 +142,7 @@ public:
RaftGroup* group();
Poller *poller();
qclient::QClient *tunnel();
qclient::Options makeNoRedirectOptions();
RaftServer myself();
std::vector<RaftServer> nodes();
......@@ -183,6 +184,7 @@ public:
RaftTrimmer* trimmer(int id = 0);
const RaftContactDetails* contactDetails(int id = 0);
qclient::Options makeNoRedirectOptions(int id = 0);
void killTunnel(int id = 0);
// manage node state
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment