Commit 4a3b1ba9 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Ensure only writes can make it into the raft journal

Fixes EOS-2072, thanks to Herve Rousseau for the bug report
parent e816276c
Pipeline #223137 passed with stages
in 22 minutes and 40 seconds
......@@ -81,9 +81,9 @@ struct cmdMapInit {
redis_cmd_map["raft_remove_member"] = {RedisCommand::RAFT_REMOVE_MEMBER, CommandType::RAFT};
redis_cmd_map["raft_promote_observer"] = {RedisCommand::RAFT_PROMOTE_OBSERVER, CommandType::RAFT};
redis_cmd_map["raft_heartbeat"] = {RedisCommand::RAFT_HEARTBEAT, CommandType::RAFT};
redis_cmd_map["raft_invalid_command"] = {RedisCommand::RAFT_INVALID_COMMAND, CommandType::RAFT};
redis_cmd_map["quarkdb_info"] = {RedisCommand::QUARKDB_INFO, CommandType::QUARKDB};
redis_cmd_map["quarkdb_stats"] = {RedisCommand::QUARKDB_STATS, CommandType::QUARKDB};
redis_cmd_map["quarkdb_detach"] = {RedisCommand::QUARKDB_DETACH, CommandType::QUARKDB};
redis_cmd_map["quarkdb_attach"] = {RedisCommand::QUARKDB_ATTACH, CommandType::QUARKDB};
redis_cmd_map["quarkdb_start_resilvering"] = {RedisCommand::QUARKDB_START_RESILVERING, CommandType::QUARKDB};
......@@ -91,6 +91,7 @@ struct cmdMapInit {
redis_cmd_map["quarkdb_resilvering_copy_file"] = {RedisCommand::QUARKDB_RESILVERING_COPY_FILE, CommandType::QUARKDB};
redis_cmd_map["quarkdb_cancel_resilvering"] = {RedisCommand::QUARKDB_CANCEL_RESILVERING, CommandType::QUARKDB};
redis_cmd_map["quarkdb_bulkload_finalize"] = {RedisCommand::QUARKDB_BULKLOAD_FINALIZE, CommandType::QUARKDB};
redis_cmd_map["quarkdb_invalid_command"] = {RedisCommand::QUARKDB_INVALID_COMMAND, CommandType::QUARKDB};
}
} cmd_map_init;
......@@ -85,16 +85,17 @@ enum class RedisCommand {
RAFT_REMOVE_MEMBER,
RAFT_PROMOTE_OBSERVER,
RAFT_HEARTBEAT,
RAFT_INVALID_COMMAND, // used in tests
QUARKDB_INFO,
QUARKDB_STATS,
QUARKDB_DETACH,
QUARKDB_ATTACH,
QUARKDB_START_RESILVERING,
QUARKDB_FINISH_RESILVERING,
QUARKDB_RESILVERING_COPY_FILE,
QUARKDB_CANCEL_RESILVERING,
QUARKDB_BULKLOAD_FINALIZE
QUARKDB_BULKLOAD_FINALIZE,
QUARKDB_INVALID_COMMAND // used in tests
};
enum class CommandType {
......
......@@ -91,6 +91,11 @@ LinkStatus QuarkDBNode::dispatch(Connection *conn, RedisRequest &req) {
return conn->vector(this->info().toVector());
}
default: {
if(req.getCommandType() == CommandType::QUARKDB) {
qdb_critical("Unable to dispatch command '" << req[0] << "' of type QUARKDB");
return conn->err(SSTR("internal dispatching error"));
}
return shard->dispatch(conn, req);
}
}
......
......@@ -35,12 +35,18 @@ void setStacktraceOnError(bool val) {
stacktraceOnError = val;
}
std::string errorStacktrace() {
if(stacktraceOnError) {
return SSTR(" ----- " << getStacktrace());
std::string errorStacktrace(bool crash) {
if(!stacktraceOnError) {
return "";
}
return "";
std::string suffixMessage;
if(!crash) {
suffixMessage = " ----- The above stacktrace does NOT signify a crash! It's used to show the location of a serious error.";
}
return SSTR(" ----- " << getStacktrace() << suffixMessage);
}
std::mutex logMutex;
......
......@@ -45,7 +45,7 @@ namespace quarkdb {
void setStacktraceOnError(bool val);
// Returns a stacktrace if 'stacktrace-on-error' is enabled, empty otherwise.
std::string errorStacktrace();
std::string errorStacktrace(bool crash);
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
TypeName(const TypeName&) = delete; \
......@@ -65,7 +65,7 @@ extern std::mutex logMutex;
// temporary solution for now
#define qdb_log(message) ___log(message)
#define qdb_event(message) ___log("EVENT: " << message)
#define qdb_critical(message) ___log("CRITICAL: " << message << quarkdb::errorStacktrace())
#define qdb_critical(message) ___log("CRITICAL: " << message << quarkdb::errorStacktrace(false))
#define qdb_warn(message) ___log("WARNING: " << message)
#define qdb_error(message) ___log("ERROR: " << message)
......@@ -73,8 +73,8 @@ extern std::mutex logMutex;
#define qdb_debug(message) if(false) { ___log(message); }
// a serious error has occured signifying a bug in the program logic
#define qdb_throw(message) throw FatalException(SSTR(message << quarkdb::errorStacktrace()))
#define qdb_assert(condition) if(!((condition))) throw FatalException(SSTR("assertion violation, condition is not true: " << #condition << quarkdb::errorStacktrace()))
#define qdb_throw(message) throw FatalException(SSTR(message << quarkdb::errorStacktrace(true)))
#define qdb_assert(condition) if(!((condition))) throw FatalException(SSTR("assertion violation, condition is not true: " << #condition << quarkdb::errorStacktrace(true)))
bool my_strtoll(const std::string &str, int64_t &ret);
bool my_strtod(const std::string &str, double &ret);
......
......@@ -233,11 +233,17 @@ LinkStatus RaftDispatcher::service(Connection *conn, RedisRequest &req) {
// If we've made it this far, the state machine should be all caught-up
// by now. Proceed to service this request.
qdb_assert(snapshot.leadershipMarker <= stateMachine.getLastApplied());
}
}
return conn->addPendingRequest(&redisDispatcher, std::move(req));
}
// At this point, the received command *must* be a write - verify!
if(req.getCommandType() != CommandType::WRITE) {
qdb_critical("RaftDispatcher: unable to dispatch non-write command: " << req[0]);
return conn->err("internal dispatching error");
}
// send request to the write tracker
std::lock_guard<std::mutex> lock(raftCommand);
......
......@@ -296,6 +296,7 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
futures.emplace_back(tunnel(leaderID)->exec("sismember", "myhash", "b"));
futures.emplace_back(tunnel(leaderID)->exec("scard", "myhash"));
futures.emplace_back(tunnel(leaderID)->exec("scard", "does-not-exist"));
futures.emplace_back(tunnel(leaderID)->exec("raft_invalid_command"));
size_t count = 0;
ASSERT_REPLY(futures[count++], 1);
......@@ -320,6 +321,7 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
ASSERT_REPLY(futures[count++], 0);
ASSERT_REPLY(futures[count++], "ERR internal dispatching error");
futures.clear();
futures.emplace_back(tunnel(leaderID)->exec("set", "mystring", "asdf"));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment