Commit 52531c46 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Make fsync policy setting fully functioning

parent ff9bdb50
Pipeline #1341649 passed with stages
in 58 minutes and 9 seconds
......@@ -136,6 +136,7 @@ struct cmdMapInit {
redis_cmd_map["raft_heartbeat"] = {RedisCommand::RAFT_HEARTBEAT, CommandType::RAFT};
redis_cmd_map["raft_fetch_last"] = {RedisCommand::RAFT_FETCH_LAST, CommandType::RAFT};
redis_cmd_map["raft_journal_scan"] = {RedisCommand::RAFT_JOURNAL_SCAN, CommandType::RAFT};
redis_cmd_map["raft_set_fsync_policy"] = {RedisCommand::RAFT_SET_FSYNC_POLICY, CommandType::RAFT};
redis_cmd_map["activate_stale_reads"] = {RedisCommand::ACTIVATE_STALE_READS, CommandType::RAFT};
......
......@@ -143,6 +143,7 @@ enum class RedisCommand {
RAFT_HEARTBEAT,
RAFT_FETCH_LAST,
RAFT_JOURNAL_SCAN,
RAFT_SET_FSYNC_POLICY,
ACTIVATE_STALE_READS,
......
......@@ -106,19 +106,23 @@ inline std::string fsyncPolicyToString(FsyncPolicy pol) {
}
}
inline FsyncPolicy parseFsyncPolicy(std::string_view s) {
inline bool parseFsyncPolicy(std::string_view s, FsyncPolicy &out) {
if(s == "always") {
return FsyncPolicy::kAlways;
out = FsyncPolicy::kAlways;
return true;
}
else if(s == "sync-important-updates") {
return FsyncPolicy::kSyncImportantUpdates;
}
else if(s == "async") {
return FsyncPolicy::kAsync;
if(s == "sync-important-updates") {
out = FsyncPolicy::kSyncImportantUpdates;
return true;
}
else {
qdb_throw("could not parse FsyncPolicy: " << s);
if(s == "async") {
out = FsyncPolicy::kAsync;
return true;
}
return false;
}
using RaftClusterID = std::string;
......
......@@ -166,6 +166,17 @@ LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) {
RaftAppendEntriesResponse resp = appendEntries(std::move(dest));
return conn->vector(resp.toVector());
}
case RedisCommand::RAFT_SET_FSYNC_POLICY: {
if(req.size() != 2u) return conn->errArgs(req[0]);
FsyncPolicy policy;
if(!parseFsyncPolicy(req[1], policy)) {
return conn->err(SSTR("could not parse '" << req[1] << "', available choices: always,async,sync-important-updates"));
}
journal.setFsyncPolicy(policy);
return conn->ok();
}
case RedisCommand::RAFT_REQUEST_VOTE: {
if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands");
RaftVoteRequest votereq;
......
......@@ -136,6 +136,17 @@ void RaftJournal::obliterate(RaftClusterID newClusterID, const std::vector<RaftS
initialize();
}
void RaftJournal::initializeFsyncPolicy() {
std::string policyStr = this->get_or_die(KeyConstants::kJournal_FsyncPolicy);
FsyncPolicy tmp = FsyncPolicy::kSyncImportantUpdates;
if(!parseFsyncPolicy(policyStr, tmp)) {
qdb_warn("Invalid fsync policy in journal: " << policyStr);
}
fsyncPolicy = tmp;
}
void RaftJournal::initialize() {
currentTerm = this->get_int_or_die(KeyConstants::kJournal_CurrentTerm);
logSize = this->get_int_or_die(KeyConstants::kJournal_LogSize);
......@@ -147,7 +158,7 @@ void RaftJournal::initialize() {
membershipEpoch = this->get_int_or_die(KeyConstants::kJournal_MembershipEpoch);
members = RaftMembers(this->get_or_die(KeyConstants::kJournal_Members));
fsyncPolicy = parseFsyncPolicy(this->get_or_die(KeyConstants::kJournal_FsyncPolicy));
initializeFsyncPolicy();
if(!vote.empty() && !parseServer(vote, votedFor)) {
qdb_throw("journal corruption, cannot parse " << KeyConstants::kJournal_VotedFor << ": " << vote);
......@@ -278,7 +289,10 @@ void RaftJournal::commitBatch(rocksdb::WriteBatch &batch, LogIndex index, bool i
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_LogSize, intToBinaryString(index)));
}
rocksdb::Status st = db->Write(rocksdb::WriteOptions(), &batch);
rocksdb::WriteOptions opts;
opts.sync = shouldSync(important);
rocksdb::Status st = db->Write(opts, &batch);
if(!st.ok()) qdb_throw("unable to commit journal transaction: " << st.ToString());
if(index >= 0) logSize = index;
}
......
......@@ -48,7 +48,6 @@ public:
// should never have to be called during normal operation, only in the tests
// assumes there's no other concurrent access to the journal
void obliterate(RaftClusterID clusterID, const std::vector<RaftServer> &nodes, LogIndex startIndex, FsyncPolicy fscynPolicy);
void initialize();
bool setCurrentTerm(RaftTerm term, RaftServer vote);
bool setCommitIndex(LogIndex index);
......@@ -116,6 +115,8 @@ private:
void rawSetCommitIndex(LogIndex index);
void ensureFsyncPolicyInitialized();
bool shouldSync(bool important);
void initializeFsyncPolicy();
void initialize();
rocksdb::DB* db = nullptr;
std::string dbPath;
......
......@@ -293,3 +293,19 @@ TEST_F(Raft_Journal, T1) {
ASSERT_EQ(journal.getMembership(), originalMembership);
}
}
TEST(FsyncPolicy, Parsing) {
FsyncPolicy policy;
ASSERT_TRUE(parseFsyncPolicy("async", policy));
ASSERT_EQ(policy, FsyncPolicy::kAsync);
ASSERT_TRUE(parseFsyncPolicy("always", policy));
ASSERT_EQ(policy, FsyncPolicy::kAlways);
ASSERT_TRUE(parseFsyncPolicy("sync-important-updates", policy));
ASSERT_EQ(policy, FsyncPolicy::kSyncImportantUpdates);
ASSERT_FALSE(parseFsyncPolicy("aaaa", policy));
ASSERT_FALSE(parseFsyncPolicy("ALWAYS", policy));
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment