Commit b6798192 authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement deque trimming method, only in state machine for now

parent 09babd28
Pipeline #486507 passed with stages
in 30 minutes and 47 seconds
Subproject commit 7e410bd8fc4d1a6e5f782792209f2e6d24e0d06a
Subproject commit b71d76c06850b45a86758e3ac6d1052364ded3b6
......@@ -83,6 +83,7 @@ struct cmdMapInit {
redis_cmd_map["deque_pop_front"] = {RedisCommand::DEQUE_POP_FRONT, CommandType::WRITE};
redis_cmd_map["deque_push_back"] = {RedisCommand::DEQUE_PUSH_BACK, CommandType::WRITE};
redis_cmd_map["deque_pop_back"] = {RedisCommand::DEQUE_POP_BACK, CommandType::WRITE};
redis_cmd_map["deque_trim_front"] = {RedisCommand::DEQUE_TRIM_FRONT, CommandType::WRITE};
redis_cmd_map["config_set"] = {RedisCommand::CONFIG_SET, CommandType::WRITE};
redis_cmd_map["lhset"] = {RedisCommand::LHSET, CommandType::WRITE};
redis_cmd_map["lhdel"] = {RedisCommand::LHDEL, CommandType::WRITE};
......
......@@ -88,6 +88,7 @@ enum class RedisCommand {
DEQUE_POP_FRONT,
DEQUE_PUSH_BACK,
DEQUE_POP_BACK,
DEQUE_TRIM_FRONT,
DEQUE_LEN,
RAW_SCAN,
......
......@@ -1055,6 +1055,31 @@ rocksdb::Status StateMachine::dequePopBack(StagingArea &stagingArea, const std::
return this->dequePop(stagingArea, Direction::kRight, key, item);
}
rocksdb::Status StateMachine::dequeTrimFront(StagingArea &stagingArea, const std::string &key, uint64_t maxToKeep, int64_t &itemsRemoved) {
WriteOperation operation(stagingArea, key, KeyType::kDeque);
if(!operation.valid()) return wrong_type();
KeyDescriptor &descriptor = operation.descriptor();
int64_t toRemove = descriptor.getSize() - maxToKeep;
if(toRemove <= 0) {
operation.cancel();
itemsRemoved = 0;
return rocksdb::Status::OK();
}
for(uint64_t nextToEliminate = descriptor.getStartIndex()+1; nextToEliminate <=
descriptor.getStartIndex() + toRemove; nextToEliminate++) {
qdb_assert(operation.deleteField(unsignedIntToBinaryString(nextToEliminate)));
}
itemsRemoved = toRemove;
descriptor.setStartIndex(descriptor.getStartIndex() + toRemove);
qdb_assert(descriptor.getEndIndex() - descriptor.getStartIndex() - 1 == maxToKeep);
return operation.finalize(descriptor.getEndIndex() - descriptor.getStartIndex() - 1);
}
void StateMachine::advanceClock(StagingArea &stagingArea, ClockValue newValue) {
// Assert we're not setting the clock back..
ClockValue prevValue;
......@@ -1780,3 +1805,7 @@ rocksdb::Status StateMachine::lease_get(const std::string &key, ClockValue clock
rocksdb::Status StateMachine::lease_release(const std::string &key, ClockValue clockUpdate, LogIndex index) {
CHAIN(index, lease_release, key, clockUpdate);
}
rocksdb::Status StateMachine::dequeTrimFront(const std::string &key, uint64_t maxToKeep, int64_t &itemsRemoved, LogIndex index) {
CHAIN(index, dequeTrimFront, key, maxToKeep, itemsRemoved);
}
\ No newline at end of file
......@@ -87,6 +87,7 @@ public:
rocksdb::Status dequePushBack(StagingArea &stagingArea, const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &length);
rocksdb::Status dequePopFront(StagingArea &stagingArea, const std::string &key, std::string &item);
rocksdb::Status dequePopBack(StagingArea &stagingArea, const std::string &key, std::string &item);
rocksdb::Status dequeTrimFront(StagingArea &stagingArea, const std::string &key, uint64_t maxToKeep, int64_t &itemsRemoved);
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
LeaseAcquisitionStatus lease_acquire(StagingArea &stagingArea, const std::string &key, const std::string &value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info);
......@@ -150,6 +151,7 @@ public:
rocksdb::Status dequePushBack(const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &length, LogIndex index = 0);
rocksdb::Status dequePopFront(const std::string &key, std::string &item, LogIndex index = 0);
rocksdb::Status dequePopBack(const std::string &key, std::string &item, LogIndex index = 0);
rocksdb::Status dequeTrimFront(const std::string &key, uint64_t maxToKeep, int64_t &itemsRemoved, LogIndex index = 0);
rocksdb::Status dequeLen(const std::string &key, size_t &len);
rocksdb::Status lhset(const std::string &key, const std::string &field, const std::string &hint, const std::string &value, bool &fieldcreated, LogIndex index = 0);
rocksdb::Status lhlen(const std::string &key, size_t &len);
......
......@@ -402,6 +402,45 @@ TEST_F(State_Machine, DequeOperations) {
ASSERT_NOTFOUND(stateMachine()->dequePopFront("my_list", item));
}
TEST_F(State_Machine, DequeTrimming) {
std::vector<std::string> vec = {"1", "2", "3", "4", "5", "6", "7"};
int64_t length;
ASSERT_OK(stateMachine()->dequePushBack("my-deque", vec.begin(), vec.end(), length));
ASSERT_EQ(length, 7);
ASSERT_OK(stateMachine()->dequeTrimFront("my-deque", 50, length));
ASSERT_EQ(length, 0);
size_t len;
ASSERT_OK(stateMachine()->dequeLen("my-deque", len));
ASSERT_EQ(len, 7u);
ASSERT_OK(stateMachine()->dequeTrimFront("my-deque", 5, length));
ASSERT_EQ(length, 2);
ASSERT_OK(stateMachine()->dequeLen("my-deque", len));
ASSERT_EQ(len, 5u);
std::string item;
ASSERT_OK(stateMachine()->dequePopFront("my-deque", item));
ASSERT_EQ(item, "3");
ASSERT_OK(stateMachine()->dequePopFront("my-deque", item));
ASSERT_EQ(item, "4");
ASSERT_OK(stateMachine()->dequePopFront("my-deque", item));
ASSERT_EQ(item, "5");
ASSERT_OK(stateMachine()->dequePopFront("my-deque", item));
ASSERT_EQ(item, "6");
ASSERT_OK(stateMachine()->dequePopFront("my-deque", item));
ASSERT_EQ(item, "7");
ASSERT_NOTFOUND(stateMachine()->dequePopFront("my-deque", item));
}
TEST_F(State_Machine, DequeOperations2) {
std::vector<std::string> vec = {"item1", "item2", "item3", "item4"};
int64_t length;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment