Commit b8b49c27 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Replace raft-checkpoint with quarkdb-checkpoint

This creates a complete DB directory, from which we're
ready to spin-up a full node.
parent 1d2fe9be
Pipeline #594900 passed with stages
in 57 minutes and 34 seconds
......@@ -118,7 +118,6 @@ struct cmdMapInit {
redis_cmd_map["raft_leader_info"] = {RedisCommand::RAFT_LEADER_INFO, CommandType::RAFT};
redis_cmd_map["raft_request_vote"] = {RedisCommand::RAFT_REQUEST_VOTE, CommandType::RAFT};
redis_cmd_map["raft_fetch"] = {RedisCommand::RAFT_FETCH, CommandType::RAFT};
redis_cmd_map["raft_checkpoint"] = {RedisCommand::RAFT_CHECKPOINT, CommandType::RAFT};
redis_cmd_map["raft_attempt_coup"] = {RedisCommand::RAFT_ATTEMPT_COUP, CommandType::RAFT};
redis_cmd_map["raft_add_observer"] = {RedisCommand::RAFT_ADD_OBSERVER, CommandType::RAFT};
redis_cmd_map["raft_remove_member"] = {RedisCommand::RAFT_REMOVE_MEMBER, CommandType::RAFT};
......@@ -140,6 +139,10 @@ struct cmdMapInit {
redis_cmd_map["quarkdb_level_stats"] = {RedisCommand::QUARKDB_LEVEL_STATS, CommandType::QUARKDB};
redis_cmd_map["quarkdb_compression_stats"] = {RedisCommand::QUARKDB_COMPRESSION_STATS, CommandType::QUARKDB};
redis_cmd_map["quarkdb_version"] = {RedisCommand::QUARKDB_VERSION, CommandType::QUARKDB};
redis_cmd_map["quarkdb_checkpoint"] = {RedisCommand::QUARKDB_CHECKPOINT, CommandType::QUARKDB};
// Compatibility: Keep raft_checkpoint, make identical to quarkdb_checkpoint.
// Maybe remove in a few versions.
redis_cmd_map["raft_checkpoint"] = {RedisCommand::QUARKDB_CHECKPOINT, CommandType::QUARKDB};
redis_cmd_map["recovery_info"] = {RedisCommand::RECOVERY_INFO, CommandType::RECOVERY};
redis_cmd_map["recovery_set"] = {RedisCommand::RECOVERY_SET, CommandType::RECOVERY};
......
......@@ -124,7 +124,6 @@ enum class RedisCommand {
RAFT_LEADER_INFO,
RAFT_REQUEST_VOTE,
RAFT_FETCH,
RAFT_CHECKPOINT,
RAFT_ATTEMPT_COUP,
RAFT_ADD_OBSERVER,
RAFT_REMOVE_MEMBER,
......@@ -146,6 +145,7 @@ enum class RedisCommand {
QUARKDB_LEVEL_STATS,
QUARKDB_COMPRESSION_STATS,
QUARKDB_VERSION,
QUARKDB_CHECKPOINT,
RECOVERY_GET,
RECOVERY_SET,
......
......@@ -133,6 +133,16 @@ LinkStatus QuarkDBNode::dispatch(Connection *conn, RedisRequest &req) {
case RedisCommand::QUARKDB_VERSION: {
return conn->string(VERSION_FULL_STRING);
}
case RedisCommand::QUARKDB_CHECKPOINT: {
if(req.size() != 2) return conn->errArgs(req[0]);
std::string err = shardDirectory->checkpoint(req[1]);
if(!err.empty()) {
return conn->err(err);
}
return conn->ok();
}
case RedisCommand::CONVERT_STRING_TO_INT:
case RedisCommand::CONVERT_INT_TO_STRING: {
return conn->raw(handleConversion(req));
......
......@@ -21,12 +21,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "utils/FileUtils.hh"
#include "ShardDirectory.hh"
#include "utils/FileUtils.hh"
#include "StateMachine.hh"
#include "utils/FileDescriptor.hh"
#include "raft/RaftJournal.hh"
#include <sys/stat.h>
using namespace quarkdb;
ShardSnapshot::ShardSnapshot(const std::string &path_)
......@@ -180,17 +182,17 @@ std::unique_ptr<ShardSnapshot> ShardDirectory::takeSnapshot(const SnapshotID &id
return nullptr;
}
std::string journalCheckpoint = pathJoin(snapshotDirectory, "raft-journal");
rocksdb::Status st = getRaftJournal()->checkpoint(journalCheckpoint);
std::string smCheckpoint = pathJoin(snapshotDirectory, "state-machine");
rocksdb::Status st = getStateMachine()->checkpoint(smCheckpoint);
if(!st.ok()) {
qdb_critical("cannot create journal checkpoint in " << journalCheckpoint << ": " << st.ToString());
qdb_critical("cannot create state machine checkpoint in " << smCheckpoint << ": " << st.ToString());
return nullptr;
}
std::string smCheckpoint = pathJoin(snapshotDirectory, "state-machine");
st = getStateMachine()->checkpoint(smCheckpoint);
std::string journalCheckpoint = pathJoin(snapshotDirectory, "raft-journal");
st = getRaftJournal()->checkpoint(journalCheckpoint);
if(!st.ok()) {
qdb_critical("cannot create state machine checkpoint in " << smCheckpoint << ": " << st.ToString());
qdb_critical("cannot create journal checkpoint in " << journalCheckpoint << ": " << st.ToString());
return nullptr;
}
......@@ -275,3 +277,48 @@ std::string ShardDirectory::getTempSnapshot(const SnapshotID &id) {
const ResilveringHistory& ShardDirectory::getResilveringHistory() const {
return resilveringHistory;
}
std::string ShardDirectory::checkpoint(const std::string &path) {
if(mkdir(path.c_str(), S_IRWXU) != 0) {
return SSTR("Could not mkdir " << path << ": " << errno << " (" << strerror(errno) << ")");
}
std::string checkpointCurrent = pathJoin(path, "current");
if(mkdir(checkpointCurrent.c_str(), S_IRWXU) != 0) {
return SSTR("Could not mkdir " << checkpointCurrent << ": " << errno << " (" << strerror(errno) << ")");
}
std::string smCheckpoint = pathJoin(checkpointCurrent, "state-machine");
rocksdb::Status st = getStateMachine()->checkpoint(smCheckpoint);
if(!st.ok()) {
std::string err = SSTR("Could not create state machine checkpoint in " << smCheckpoint << ": " << st.ToString());
qdb_critical(err);
return err;
}
// TODO, switch to if(configuration.getMode() == Mode::raft)
if(journalptr) {
std::string journalCheckpoint = pathJoin(checkpointCurrent, "raft-journal");
rocksdb::Status st = getRaftJournal()->checkpoint(journalCheckpoint);
if(!st.ok()) {
std::string err = SSTR("Could not create journal checkpoint in " << journalCheckpoint << ": " << st.ToString());
qdb_critical(err);
return err;
}
}
std::string resilvHist = pathJoin(path, "RESILVERING-HISTORY");
std::string err;
if(!write_file(resilvHist, resilveringHistory.serialize(), err)) {
qdb_critical(err);
return err;
}
std::string shardIdentPath = pathJoin(path, "SHARD-ID");
if(!write_file(shardIdentPath, shardID, err)) {
qdb_critical(err);
return err;
}
return {}; // success
}
......@@ -76,9 +76,12 @@ public:
bool resilveringStart(const ResilveringEventID &id, std::string &err);
bool resilveringCopy(const ResilveringEventID &id, const std::string &filename, const std::string &contents, std::string &err);
bool resilveringFinish(const ResilveringEventID &id, std::string &err);
const ResilveringHistory& getResilveringHistory() const;
// empty string in case of success - otherwise contains the error
// TODO: replace with proper status object
std::string checkpoint(const std::string &path);
private:
void parseResilveringHistory();
void storeResilveringHistory();
......
......@@ -54,7 +54,7 @@ StandaloneDispatcher::StandaloneDispatcher(StateMachine &sm)
LinkStatus StandaloneDispatcher::dispatch(Connection *conn, RedisRequest &req) {
// Show a user-friendly error message for raft-info, instead of
// "internal dispatching error" less scary message for "raft-info"
// "internal dispatching error"
if(req.getCommandType() == CommandType::RAFT) {
qdb_warn("Received command " << req[0] << ", even though raft is not active");
......
......@@ -165,16 +165,6 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
conn->raftAuthorization = true;
return conn->ok();
}
case RedisCommand::RAFT_CHECKPOINT: {
if(req.size() != 2) return conn->errArgs(req[0]);
std::string err;
if(!checkpoint(req[1], err)) {
return conn->err(err);
}
return conn->ok();
}
case RedisCommand::RAFT_ATTEMPT_COUP: {
RaftStateSnapshotPtr snapshot = state.getSnapshot();
......@@ -612,24 +602,3 @@ bool RaftDispatcher::fetch(LogIndex index, RaftEntry &entry) {
rocksdb::Status st = journal.fetch(index, entry);
return st.ok();
}
bool RaftDispatcher::checkpoint(const std::string &path, std::string &err) {
if(mkdir(path.c_str(), 0775) != 0) {
err = SSTR("Error when creating directory '" << path << "', errno: " << errno);
return false;
}
rocksdb::Status st = stateMachine.checkpoint(SSTR(path << "/state-machine"));
if(!st.ok()) {
err = st.ToString();
return false;
}
st = journal.checkpoint(SSTR(path << "/raft-journal"));
if(!st.ok()) {
err = st.ToString();
return false;
}
return true;
}
......@@ -54,7 +54,6 @@ public:
RaftInfo info();
bool fetch(LogIndex index, RaftEntry &entry);
bool checkpoint(const std::string &path, std::string &err);
RaftHeartbeatResponse heartbeat(const RaftHeartbeatRequest &req);
RaftAppendEntriesResponse appendEntries(RaftAppendEntriesRequest &&req);
......
......@@ -27,6 +27,7 @@
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftConfig.hh"
#include "raft/RaftContactDetails.hh"
#include "ShardDirectory.hh"
#include "Version.hh"
#include "Poller.hh"
#include "Configuration.hh"
......@@ -158,11 +159,11 @@ TEST_F(Raft_e2e, simultaneous_clients) {
// Before taking a checkpoint, ensure node #0 is caught up
RETRY_ASSERT_TRUE(stateMachine(0)->getLastApplied() == stateMachine(leaderID)->getLastApplied());
ASSERT_TRUE(dispatcher()->checkpoint(checkpointPath, err));
ASSERT_FALSE(dispatcher()->checkpoint(checkpointPath, err)); // exists already
ASSERT_TRUE(shardDirectory()->checkpoint(checkpointPath).empty());
ASSERT_FALSE(shardDirectory()->checkpoint(checkpointPath).empty()); // exists already
// pretty expensive to open two extra databases, but necessary
StateMachine checkpointSM(SSTR(checkpointPath << "/state-machine"));
StateMachine checkpointSM(SSTR(checkpointPath << "/current/state-machine"));
std::string tmp;
ASSERT_OK(checkpointSM.get("client3", tmp));
......@@ -174,7 +175,7 @@ TEST_F(Raft_e2e, simultaneous_clients) {
// TODO: verify checkpointSM last applied, once atomic commits are implemented
// ensure the checkpoint journal is identical to the original
RaftJournal checkpointJournal(SSTR(checkpointPath << "/raft-journal"));
RaftJournal checkpointJournal(SSTR(checkpointPath << "/current/raft-journal"));
ASSERT_EQ(checkpointJournal.getLogSize(), journal()->getLogSize());
for(LogIndex i = 0; i < journal()->getLogSize(); i++) {
RaftEntry entry1, entry2;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment