Commit c9f15873 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Add first functioning pubsub support - only channels for now, no patterns

parent 091e7228
Pipeline #640277 passed with stages
in 34 minutes and 32 seconds
Subproject commit c32021b36063dcc3a4a58d95c8fdb8d221a2a762
Subproject commit 1bc8e58f299f674b3da6e19543149296fdd5e851
......@@ -40,6 +40,8 @@ add_library(XrdQuarkDB SHARED
health/HealthIndicator.hh
pubsub/Publisher.cc pubsub/Publisher.hh
raft/RaftBlockedWrites.cc raft/RaftBlockedWrites.hh
raft/RaftConfig.cc raft/RaftConfig.hh
raft/RaftJournal.cc raft/RaftJournal.hh
......
......@@ -153,5 +153,10 @@ struct cmdMapInit {
redis_cmd_map["convert_string_to_int"] = {RedisCommand::CONVERT_STRING_TO_INT, CommandType::CONTROL};
redis_cmd_map["convert_int_to_string"] = {RedisCommand::CONVERT_INT_TO_STRING, CommandType::CONTROL};
redis_cmd_map["publish"] = {RedisCommand::PUBLISH, CommandType::PUBSUB};
redis_cmd_map["subscribe"] = {RedisCommand::SUBSCRIBE, CommandType::PUBSUB};
redis_cmd_map["psubscribe"] = {RedisCommand::PSUBSCRIBE, CommandType::PUBSUB};
redis_cmd_map["unsubscribe"] = {RedisCommand::UNSUBSCRIBE, CommandType::PUBSUB};
redis_cmd_map["punsubscribe"] = {RedisCommand::PUNSUBSCRIBE, CommandType::PUBSUB};
}
} cmd_map_init;
......@@ -155,6 +155,12 @@ enum class RedisCommand {
CONVERT_STRING_TO_INT,
CONVERT_INT_TO_STRING,
PUBLISH,
SUBSCRIBE,
PSUBSCRIBE,
UNSUBSCRIBE,
PUNSUBSCRIBE,
};
enum class CommandType {
......@@ -166,7 +172,8 @@ enum class CommandType {
RAFT,
QUARKDB,
AUTHENTICATION,
RECOVERY
RECOVERY,
PUBSUB
};
#define QDB_ALWAYS_INLINE __attribute__((always_inline))
......
......@@ -65,6 +65,8 @@ public:
LinkStatus addPendingTransaction(RedisDispatcher *dispatcher, Transaction &&tx, LogIndex index = -1);
LogIndex dispatchPending(RedisDispatcher *dispatcher, LogIndex commitIndex);
bool appendIfAttached(RedisEncodedResponse &&raw);
size_t subscriptions = 0u;
private:
LinkStatus appendResponseNoLock(RedisEncodedResponse &&raw);
Connection *conn;
......
// ----------------------------------------------------------------------
// File: Publisher.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "Publisher.hh"
#include "../Formatter.hh"
using namespace quarkdb;
void Publisher::purge(RedisEncodedResponse resp) {
std::unique_lock<std::mutex> lock(mtx);
for(auto it1 = channelSubscriptions.begin(); it1 != channelSubscriptions.end(); it1++) {
for(auto it2 = it1->second.begin(); it2 != it1->second.end(); it2++) {
(*it2)->appendIfAttached(RedisEncodedResponse(resp));
}
}
channelSubscriptions.clear();
}
int Publisher::subscribe(std::shared_ptr<PendingQueue> connection, std::string_view channel) {
std::unique_lock<std::mutex> lock(mtx);
int additions = 0u;
auto res = channelSubscriptions[std::string(channel)].emplace(connection);
if(res.second) {
additions++;
}
return additions;
}
int Publisher::publish(std::string_view channel, std::string_view payload) {
std::unique_lock<std::mutex> lock(mtx);
auto existenceCheck = channelSubscriptions.find(std::string(channel));
if(existenceCheck == channelSubscriptions.end()) {
return 0u;
}
auto &targetSet = existenceCheck->second;
int hits = 0;
for(auto it = targetSet.begin(); it != targetSet.end(); ) {
bool stillAlive = (*it)->appendIfAttached(Formatter::message(channel, payload));
if(!stillAlive) {
it = targetSet.erase(it);
}
else {
it++;
hits++;
}
}
if(targetSet.size() == 0u) {
channelSubscriptions.erase(std::string(channel));
}
return hits;
}
LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) {
switch(req.getCommand()) {
case RedisCommand::SUBSCRIBE: {
if(req.size() <= 1) return conn->errArgs(req[0]);
int retval = 1;
for(size_t i = 1; i < req.size(); i++) {
conn->getQueue()->subscriptions += subscribe(conn->getQueue(), req[i]);
if(retval >= 0) {
retval = conn->raw(Formatter::subscribe(req[i], conn->getQueue()->subscriptions));
}
}
return retval;
}
case RedisCommand::PUBLISH: {
if(req.size() != 3) return conn->errArgs(req[0]);
int hits = publish(req[1], req[2]);
return conn->integer(hits);
}
default: {
qdb_throw("should never reach here");
}
}
}
LinkStatus Publisher::dispatch(Connection *conn, Transaction &tx) {
qdb_throw("internal dispatching error, Publisher does not support transactions");
}
// ----------------------------------------------------------------------
// File: Publisher.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QUARKDB_PUBLISHER_HH
#define QUARKDB_PUBLISHER_HH
#include "../Connection.hh"
#include "../Dispatcher.hh"
#include <map>
#include <mutex>
#include <memory>
#include <set>
namespace quarkdb {
class PendingQueue;
class RedisRequest;
class Publisher : public Dispatcher {
public:
// Subscribe connection to given channels - returns how many subscriptions were new.
int subscribe(std::shared_ptr<PendingQueue> connection, std::string_view channel);
int publish(std::string_view channel, std::string_view payload);
void purge(RedisEncodedResponse resp);
virtual LinkStatus dispatch(Connection *conn, RedisRequest &req) override final;
virtual LinkStatus dispatch(Connection *conn, Transaction &tx) override final;
private:
std::mutex mtx;
// Map of subscribed-to channels
std::map<std::string, std::set<std::shared_ptr<PendingQueue>>> channelSubscriptions;
};
}
#endif
......@@ -53,7 +53,27 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, Transaction &transaction)
return this->service(conn, transaction);
}
LinkStatus RaftDispatcher::dispatchPubsub(Connection *conn, RedisRequest &req) {
// Only leaders should service pubsub requests.
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot->status != RaftStatus::LEADER) {
if(snapshot->leader.empty()) {
return conn->raw(Formatter::err("unavailable"));
}
// Redirect.
return conn->raw(Formatter::moved(0, snapshot->leader));
}
// We're good, submit to publisher.
return publisher.dispatch(conn, req);
}
LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
if(req.getCommandType() == CommandType::PUBSUB) {
return dispatchPubsub(conn, req);
}
switch(req.getCommand()) {
case RedisCommand::RAFT_INFO: {
// safe, read-only request, does not need authorization
......@@ -410,6 +430,7 @@ RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest
//----------------------------------------------------------------------------
writeTracker.flushQueues(Formatter::moved(0, snapshot->leader));
publisher.purge(Formatter::moved(0, snapshot->leader));
if(!journal.matchEntries(req.prevIndex, req.prevTerm)) {
return {snapshot->term, journal.getLogSize(), false, "Log entry mismatch"};
......
......@@ -25,6 +25,7 @@
#define QUARKDB_RAFT_DISPATCHER_HH
#include "../Dispatcher.hh"
#include "../pubsub/Publisher.hh"
#include "RaftUtils.hh"
#include "RaftTimeouts.hh"
#include "RaftBlockedWrites.hh"
......@@ -51,6 +52,7 @@ public:
LinkStatus dispatchInfo(Connection *conn, RedisRequest &req);
virtual LinkStatus dispatch(Connection *conn, RedisRequest &req) override final;
virtual LinkStatus dispatch(Connection *conn, Transaction &transaction) override final;
LinkStatus dispatchPubsub(Connection *conn, RedisRequest &req);
RaftInfo info();
bool fetch(LogIndex index, RaftEntry &entry);
......@@ -88,6 +90,10 @@ private:
std::chrono::steady_clock::time_point lastLaggingWarning;
void warnIfLagging(LogIndex leaderLogIndex);
//----------------------------------------------------------------------------
// Publishing service
//----------------------------------------------------------------------------
Publisher publisher;
};
}
......
......@@ -40,6 +40,8 @@
#include "qclient/QSet.hh"
#include "qclient/ConnectionInitiator.hh"
#include "qclient/QHash.hh"
#include "qclient/pubsub/MessageQueue.hh"
#include "qclient/BaseSubscriber.hh"
using namespace quarkdb;
#define ASSERT_OK(msg) ASSERT_TRUE(msg.ok())
......@@ -1705,3 +1707,58 @@ TEST_F(Raft_e2e, CloneHash) {
ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "not-existing", "not-existing-2"), "OK");
ASSERT_REPLY(tunnel(leaderID)->exec("exists", "not-existing", "not-existing-2"), 0);
}
bool lookForSentinelValues(qclient::MessageQueue *queue) {
bool penguinsFound = false;
bool chickensFound = false;
auto iterator = queue->begin();
for(size_t i = 0; i < queue->size(); i++) {
Message& item = iterator.item();
if(item.getPayload() == "penguins") {
penguinsFound = true;
}
if(item.getPayload() == "chickens") {
chickensFound = true;
}
iterator.next();
}
return penguinsFound && chickensFound;
}
TEST_F(Raft_e2e, pubsub) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
std::shared_ptr<qclient::MessageQueue> mq = std::make_shared<qclient::MessageQueue>();
qclient::SubscriptionOptions opts;
opts.handshake = makeQClientHandshake();
qclient::BaseSubscriber subscriber(members(), mq, std::move(opts));
ASSERT_REPLY(tunnel(leaderID)->exec("publish", "test-channel", "giraffes"), 0);
subscriber.subscribe( {"test-channel"} );
RETRY_ASSERT_TRUE(
qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "test-channel", "penguins").get()) ==
"(integer) 1"
);
spindown(0); spindown(1); spindown(2);
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
leaderID = getLeaderID();
// Ensure subscriber is able to re-subscribe!
RETRY_ASSERT_TRUE(
qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "test-channel", "chickens").get()) ==
"(integer) 1"
);
RETRY_ASSERT_TRUE(lookForSentinelValues(mq.get()));
}
......@@ -173,6 +173,10 @@ std::vector<RaftServer> TestCluster::nodes(int id) {
return node(id)->nodes();
}
qclient::Members TestCluster::members(int id) {
return node(id)->members();
}
qclient::QClient* TestCluster::tunnel(int id) {
return node(id)->tunnel();
}
......@@ -321,6 +325,17 @@ std::vector<RaftServer> TestNode::nodes() {
return group()->journal()->getNodes();
}
qclient::Members TestNode::members() {
qclient::Members memb;
std::vector<RaftServer> clusterNodes = this->nodes();
for(auto it = clusterNodes.begin(); it != clusterNodes.end(); it++) {
memb.push_back(it->hostname, it->port);
}
return memb;
}
Poller* TestNode::poller() {
if(pollerptr == nullptr) {
pollerptr = new Poller(myself().port, quarkdbNode());
......
......@@ -147,6 +147,7 @@ public:
RaftServer myself();
std::vector<RaftServer> nodes();
qclient::Members members();
void spinup();
void spindown();
......@@ -207,6 +208,7 @@ public:
// initialize nodes using information passed on the nodes variable, except if srv is set
TestNode* node(int id = 0, const RaftServer &srv = {});
std::vector<RaftServer> nodes(int id = 0);
qclient::Members members(int id = 0);
RaftClusterID clusterID();
template<typename... Args>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment