Commit 5ca8c8f0 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Fix subscribing to multiple channels at once when push types are active

parent 0ddcce48
Pipeline #1349392 passed with stages
in 67 minutes and 59 seconds
......@@ -2,6 +2,11 @@
## Unreleased
### Bug fixes
- Fixed ability to subscribe to multiple channels with one command, when push types
are active. Previously, the server would erroneously send one "OK" response per
channel subscribed, breaking QClient.
### New features
- Possibility to choose between three different journal fsync policies through
``RAFT-SET-FSYNC-POLICY`` command.
......
......@@ -49,33 +49,21 @@ LinkStatus PendingQueue::flushPending(const RedisEncodedResponse &msg) {
void PendingQueue::subscribe(const std::string &item) {
std::scoped_lock lock(mtx);
subscriptionTracker.addChannel(item);
if(supportsPushTypes) {
appendIfAttachedNoLock(Formatter::ok());
}
}
void PendingQueue::psubscribe(const std::string &item) {
std::scoped_lock lock(mtx);
subscriptionTracker.addPattern(item);
if(supportsPushTypes) {
appendIfAttachedNoLock(Formatter::ok());
}
}
void PendingQueue::unsubscribe(const std::string &item) {
std::scoped_lock lock(mtx);
subscriptionTracker.removeChannel(item);
if(supportsPushTypes) {
appendIfAttachedNoLock(Formatter::ok());
}
}
void PendingQueue::punsubscribe(const std::string &item) {
std::scoped_lock lock(mtx);
subscriptionTracker.removePattern(item);
if(supportsPushTypes) {
appendIfAttachedNoLock(Formatter::ok());
}
}
bool PendingQueue::addMessageIfAttached(const std::string &channel, std::string_view payload) {
......
......@@ -180,6 +180,10 @@ public:
return pendingQueue;
}
bool hasPushTypesActive() {
return pendingQueue->hasPushTypesActive();
}
bool isLocalhost() const {
return localhost;
}
......
......@@ -144,6 +144,10 @@ LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) {
}
}
if(conn->hasPushTypesActive()) {
conn->ok();
}
return retval;
}
case RedisCommand::PSUBSCRIBE: {
......@@ -158,6 +162,10 @@ LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) {
}
}
if(conn->hasPushTypesActive()) {
conn->ok();
}
return retval;
}
case RedisCommand::UNSUBSCRIBE: {
......@@ -172,6 +180,10 @@ LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) {
}
}
if(conn->hasPushTypesActive()) {
conn->ok();
}
return retval;
}
case RedisCommand::PUNSUBSCRIBE: {
......@@ -186,6 +198,10 @@ LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) {
}
}
if(conn->hasPushTypesActive()) {
conn->ok();
}
return retval;
}
case RedisCommand::PUBLISH: {
......
......@@ -2070,6 +2070,42 @@ TEST_F(Raft_e2e, pubsub) {
ASSERT_EQ(mq->size(), 0u);
}
class ReconnectionCounter : public ReconnectionListener {
public:
virtual void notifyConnectionLost(int64_t epoch, int errc,
const std::string &msg) override { }
virtual void notifyConnectionEstablished(int64_t epoch) override {
lastEpoch = epoch;
}
int64_t getEpoch() const {
return lastEpoch;
}
private:
int64_t lastEpoch = 0u;
};
TEST_F(Raft_e2e, MultiSubscribeWithPushtypes) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
std::shared_ptr<ReconnectionCounter> listener = std::make_shared<ReconnectionCounter>();
tunnel(leaderID)->attachListener(listener.get());
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("activate-push-types").get(), "OK");
ASSERT_EQ(listener->getEpoch(), 1u);
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("subscribe", "a", "b").get(), "OK");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("ping").get(), "PONG");
ASSERT_EQ(listener->getEpoch(), 1u);
ASSERT_TRUE(tunnel(leaderID)->detachListener(listener.get()));
listener.reset();
}
TEST_F(Raft_e2e, SharedDeque) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment