diff --git a/src/Commands.cc b/src/Commands.cc index 1b45886c53e4f7b2200922a2d9f376ee50ab0374..768291c3af24fe2f402ced5b8bf324bcc9554aff 100644 --- a/src/Commands.cc +++ b/src/Commands.cc @@ -36,6 +36,7 @@ struct cmdMapInit { redis_cmd_map["monitor"] = {RedisCommand::MONITOR, CommandType::CONTROL}; redis_cmd_map["client_id"] = {RedisCommand::CLIENT_ID, CommandType::CONTROL}; redis_cmd_map["command_stats"] = {RedisCommand::COMMAND_STATS, CommandType::CONTROL}; + redis_cmd_map["activate_push_types"] = {RedisCommand::ACTIVATE_PUSH_TYPES, CommandType::CONTROL}; redis_cmd_map["auth"] = {RedisCommand::AUTH, CommandType::AUTHENTICATION}; redis_cmd_map["hmac_auth_generate_challenge"] = {RedisCommand::HMAC_AUTH_GENERATE_CHALLENGE, CommandType::AUTHENTICATION}; diff --git a/src/Commands.hh b/src/Commands.hh index 8aeaab3ff966a99d4eea8d3a5d6b1c5bcb686f49..00b3758d5dec87849bc0fc28f24bff6ddc7b7474 100644 --- a/src/Commands.hh +++ b/src/Commands.hh @@ -36,6 +36,7 @@ enum class RedisCommand { MONITOR, CLIENT_ID, COMMAND_STATS, + ACTIVATE_PUSH_TYPES, FLUSHALL, diff --git a/src/Connection.cc b/src/Connection.cc index e3b588492b7d8ef446d1218b400a2bb83bc53670..3d79ccf2f73864b784d9273de9f0e9a740ec92b4 100644 --- a/src/Connection.cc +++ b/src/Connection.cc @@ -175,6 +175,10 @@ LogIndex PendingQueue::dispatchPending(RedisDispatcher *dispatcher, LogIndex com return -1; } +void PendingQueue::activatePushTypes() { + supportsPushTypes = true; +} + size_t phantomBatchLimit = 100; void Connection::setPhantomBatchLimit(size_t newval) { @@ -342,3 +346,7 @@ void Connection::flush() { std::string Connection::describe() const { return description; } + +void Connection::activatePushTypes() { + pendingQueue->activatePushTypes(); +} diff --git a/src/Connection.hh b/src/Connection.hh index a0c348490f3a6adea7de61781452b10c54d1b587..9f9b590204d65e1bfd832aa86914a583d7dd8af9 100644 --- a/src/Connection.hh +++ b/src/Connection.hh @@ -77,6 +77,8 @@ public: bool addMessageIfAttached(const std::string &channel, RedisEncodedResponse &&raw); bool addPatternMessageIfAttached(const std::string &pattern, RedisEncodedResponse &&raw); + void activatePushTypes(); + private: LinkStatus appendResponseNoLock(RedisEncodedResponse &&raw); Connection *conn; @@ -110,6 +112,7 @@ private: LogIndex lastIndex = -1; std::queue<PendingRequest> pending; SubscriptionTracker subscriptionTracker; + bool supportsPushTypes = false; }; //------------------------------------------------------------------------------ @@ -186,6 +189,7 @@ public: Connection *conn; }; + void activatePushTypes(); static void setPhantomBatchLimit(size_t newval); private: BufferedWriter writer; diff --git a/src/QuarkDBNode.cc b/src/QuarkDBNode.cc index 13c7fe12f6c462ab42336a71f4e45eef37596e48..bb66bc89211e762a45568f973204cb907e9d6664 100644 --- a/src/QuarkDBNode.cc +++ b/src/QuarkDBNode.cc @@ -128,6 +128,10 @@ LinkStatus QuarkDBNode::dispatch(Connection *conn, RedisRequest &req) { case RedisCommand::CLIENT_ID: { return conn->status(conn->getID()); } + case RedisCommand::ACTIVATE_PUSH_TYPES: { + conn->activatePushTypes(); + return conn->ok(); + } case RedisCommand::QUARKDB_INFO: { return conn->statusVector(this->info().toVector()); }