Commit e5c99faf authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Show host and connection ID in MONITOR

parent 0ac6b91e
Pipeline #318133 passed with stages
in 25 minutes and 21 seconds
......@@ -34,6 +34,7 @@ struct cmdMapInit {
redis_cmd_map["ping"] = {RedisCommand::PING, CommandType::CONTROL};
redis_cmd_map["debug"] = {RedisCommand::DEBUG, CommandType::CONTROL};
redis_cmd_map["monitor"] = {RedisCommand::MONITOR, CommandType::CONTROL};
redis_cmd_map["client_id"] = {RedisCommand::CLIENT_ID, CommandType::CONTROL};
redis_cmd_map["get"] = {RedisCommand::GET, CommandType::READ};
redis_cmd_map["exists"] = {RedisCommand::EXISTS, CommandType::READ};
......
......@@ -34,6 +34,7 @@ enum class RedisCommand {
PING,
DEBUG,
MONITOR,
CLIENT_ID,
FLUSHALL,
......
......@@ -131,7 +131,7 @@ LogIndex PendingQueue::dispatchPending(RedisDispatcher *dispatcher, LogIndex com
Connection::Connection(Link *l, size_t write_batch_limit)
: writer(l), parser(l), pendingQueue(new PendingQueue(this)),
writeBatchLimit(write_batch_limit) {
writeBatchLimit(write_batch_limit), description(l->describe()), uuid(l->getID()) {
}
Connection::~Connection() {
......@@ -250,3 +250,7 @@ void Connection::setResponseBuffering(bool value) {
void Connection::flush() {
writer.flush();
}
std::string Connection::describe() const {
return description;
}
......@@ -65,6 +65,7 @@ public:
LinkStatus addPendingRequest(RedisDispatcher *dispatcher, RedisRequest &&req, LogIndex index = -1);
LogIndex dispatchPending(RedisDispatcher *dispatcher, LogIndex commitIndex);
bool appendIfAttached(RedisEncodedResponse &&raw);
std::string describe() const;
private:
LinkStatus appendResponseNoLock(RedisEncodedResponse &&raw);
Connection *conn;
......@@ -109,6 +110,8 @@ class Connection {
public:
Connection(Link *link, size_t writeBatchLimit = 1);
~Connection();
std::string describe() const;
std::string getID() const { return uuid; }
LinkStatus raw(RedisEncodedResponse &&encoded);
LinkStatus moved(int64_t shardId, const RaftServer &location);
......@@ -170,6 +173,9 @@ private:
std::shared_ptr<PendingQueue> pendingQueue;
size_t writeBatchLimit;
std::string description;
std::string uuid;
friend class PendingQueue;
};
......
......@@ -60,6 +60,7 @@ public:
// not present in XrdLink, but convenient
LinkStatus Send(const std::string &str);
std::string describe() const;
std::string getID() const { return uuid; }
// Set global connection logging config
static void setConnectionLogging(bool val);
......
......@@ -96,6 +96,9 @@ LinkStatus QuarkDBNode::dispatch(Connection *conn, RedisRequest &req) {
return conn->err(SSTR("unknown argument '" << req[1] << "'"));
}
case RedisCommand::CLIENT_ID: {
return conn->status(conn->getID());
}
case RedisCommand::QUARKDB_INFO: {
return conn->statusVector(this->info().toVector());
}
......
......@@ -116,7 +116,7 @@ LinkStatus Shard::dispatch(Connection *conn, WriteBatch &batch) {
}
for(size_t i = 0; i < batch.requests.size(); i++) {
commandMonitor.broadcast(batch.requests[i]);
commandMonitor.broadcast(conn->describe(), batch.requests[i]);
}
LinkStatus ret = dispatcher->dispatch(conn, batch);
......@@ -125,7 +125,7 @@ LinkStatus Shard::dispatch(Connection *conn, WriteBatch &batch) {
}
LinkStatus Shard::dispatch(Connection *conn, RedisRequest &req) {
commandMonitor.broadcast(req);
commandMonitor.broadcast(conn->describe(), req);
switch(req.getCommand()) {
case RedisCommand::MONITOR: {
......
......@@ -26,7 +26,6 @@
#include "XrdVersion.hh"
#include "XrdQuarkDB.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "raft/RaftDispatcher.hh"
#include "utils/ScopedAdder.hh"
#include "QuarkDBNode.hh"
......
......@@ -25,16 +25,13 @@
#define __QUARKDB_XRDQUARKDB_PROTOCOL_H__
#include "Xrd/XrdProtocol.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "Xrd/XrdLink.hh"
#include "XrdOuc/XrdOucString.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "Utils.hh"
#include "utils/InFlightTracker.hh"
#include "EventFD.hh"
#include "qclient/QClient.hh"
#include <atomic>
#include <qclient/QClient.hh>
class XrdLink;
namespace quarkdb {
......
......@@ -23,20 +23,21 @@
#include "CommandMonitor.hh"
#include "../Formatter.hh"
#include "../Link.hh"
using namespace quarkdb;
CommandMonitor::CommandMonitor() {
}
void CommandMonitor::broadcast(const RedisRequest &received) {
void CommandMonitor::broadcast(const std::string& linkDescription, const RedisRequest &received) {
if(!active) return;
std::lock_guard<std::mutex> lock(mtx);
auto it = monitors.begin();
while(it != monitors.end()) {
bool stillAlive = (*it)->appendIfAttached(Formatter::status(received.toPrintableString()));
bool stillAlive = (*it)->appendIfAttached(Formatter::status(SSTR(linkDescription << ": " << received.toPrintableString())));
if(!stillAlive) {
it = monitors.erase(it);
......
......@@ -33,7 +33,7 @@ class CommandMonitor {
public:
CommandMonitor();
void broadcast(const RedisRequest &received);
void broadcast(const std::string& linkDescription, const RedisRequest &received);
void addRegistration(Connection *c);
size_t size();
......
......@@ -747,6 +747,11 @@ TEST_F(Raft_e2e, monitor) {
int leaderID = getLeaderID();
// Get connection ID
redisReplyPtr connIDReply = tunnel(leaderID)->exec("client-id").get();
std::string connID(connIDReply->str, connIDReply->len);
qdb_info("Connection ID: " << connID);
// We can't use QClient for this, it can't handle the output of MONITOR
qclient::ConnectionInitiator initiator("localhost", myself(leaderID).port);
ASSERT_TRUE(initiator.ok());
......@@ -763,13 +768,17 @@ TEST_F(Raft_e2e, monitor) {
tunnel(leaderID)->exec("set", "abc", "aaaa" "\xab" "bbb");
response.clear();
RETRY_ASSERT_TRUE(reader.consume(28, response));
ASSERT_EQ(response, "+\"set\" \"abc\" \"aaaa\\xABbbb\"\r\n");
std::string expectedReply = SSTR("+ [" << connID << "]: \"set\" \"abc\" \"aaaa\\xABbbb\"\r\n");
RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response));
ASSERT_EQ(response, expectedReply);
tunnel(leaderID)->exec("get", "abc");
response.clear();
RETRY_ASSERT_TRUE(reader.consume(14, response));
ASSERT_EQ(response, "+\"get\" \"abc\"\r\n");
expectedReply = SSTR("+ [" << connID << "]: \"get\" \"abc\"\r\n");
RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response));
ASSERT_EQ(response, expectedReply);
}
class PingCallback : qclient::QCallback {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment