Commit 33d3d866 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Add command to trigger manual compaction

parent 2b595d48
Pipeline #333653 failed with stages
in 20 minutes and 13 seconds
......@@ -104,6 +104,7 @@ struct cmdMapInit {
redis_cmd_map["quarkdb_cancel_resilvering"] = {RedisCommand::QUARKDB_CANCEL_RESILVERING, CommandType::QUARKDB};
redis_cmd_map["quarkdb_bulkload_finalize"] = {RedisCommand::QUARKDB_BULKLOAD_FINALIZE, CommandType::QUARKDB};
redis_cmd_map["quarkdb_invalid_command"] = {RedisCommand::QUARKDB_INVALID_COMMAND, CommandType::QUARKDB};
redis_cmd_map["quarkdb_manual_compaction"] = {RedisCommand::QUARKDB_MANUAL_COMPACTION, CommandType::QUARKDB};
redis_cmd_map["recovery_info"] = {RedisCommand::RECOVERY_INFO, CommandType::QUARKDB};
}
......
......@@ -108,6 +108,7 @@ enum class RedisCommand {
QUARKDB_CANCEL_RESILVERING,
QUARKDB_BULKLOAD_FINALIZE,
QUARKDB_INVALID_COMMAND, // used in tests
QUARKDB_MANUAL_COMPACTION,
RECOVERY_INFO
};
......
......@@ -46,6 +46,7 @@ void Shard::attach() {
else if(mode == Mode::raft) {
raftGroup = new RaftGroup(*shardDirectory, myself, timeouts);
dispatcher = static_cast<Dispatcher*>(raftGroup->dispatcher());
stateMachine = shardDirectory->getStateMachine();
}
else if(mode == Mode::bulkload) {
stateMachine = shardDirectory->getStateMachineForBulkload();
......@@ -175,6 +176,15 @@ LinkStatus Shard::dispatch(Connection *conn, RedisRequest &req) {
stateMachine->finalizeBulkload();
return conn->ok();
}
case RedisCommand::QUARKDB_MANUAL_COMPACTION: {
if(req.size() != 1) return conn->errArgs(req[0]);
InFlightRegistration registration(inFlightTracker);
if(!registration.ok()) {
return conn->err("unavailable");
}
return conn->fromStatus(stateMachine->manualCompaction());
}
default: {
if(req.getCommandType() == CommandType::QUARKDB) {
qdb_critical("Unable to dispatch command '" << req[0] << "' of type QUARKDB");
......
......@@ -977,9 +977,17 @@ rocksdb::Status StateMachine::noop(LogIndex index) {
return stagingArea.commit(index);
}
rocksdb::Status StateMachine::manualCompaction() {
qdb_event("Triggering manual compaction..");
rocksdb::CompactRangeOptions opts;
opts.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
return db->CompactRange(opts, nullptr, nullptr);
}
void StateMachine::finalizeBulkload() {
qdb_event("Finalizing bulkload, issuing manual compaction...");
THROW_ON_ERROR(db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr));
THROW_ON_ERROR(manualCompaction());
qdb_event("Manual compaction was successful. Building key descriptors...");
KeyDescriptorBuilder builder(*this);
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_InBulkload, boolToString(false)));
......
......@@ -160,6 +160,7 @@ public:
return bulkLoad;
}
rocksdb::Status manualCompaction();
void finalizeBulkload();
IteratorPtr getRawIterator();
void commitBatch(rocksdb::WriteBatch &batch);
......
......@@ -895,6 +895,7 @@ TEST_F(Raft_e2e, smove) {
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i2", "i3", "i4", "i5"));
ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5"));
ASSERT_REPLY(tunnel(leaderID)->exec("quarkdb-manual-compaction"), "OK");
}
TEST_F(Raft_e2e, sscan) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment