Commit 430e73df authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Provide the correct number of MOVED / ERR responses for phantom transactions

parent fb51d646
Pipeline #444957 passed with stages
in 30 minutes and 9 seconds
......@@ -3,6 +3,16 @@ All notable changes to this project will be documented in this file.
## Unreleased
### Changed
- Refactoring of transactions, we no longer pack / unpack a transaction into a single request within the same node, saving
CPU cycles.
### Fixed
- In certain cases, such as when redirecting or reporting unavailability for pipelined writes, fewer
responses might be provided than expected, causing the client connection to hang. This did not affect
QClient when redirects are active, as it would shut the connection down and retry upon reception of
the first such response.
## 0.2.8 (2018-07-04)
### Added
- Support for leases, which can be used as locks with timeouts, allowing QuarkDB to serve as a distributed lock manager.
......
......@@ -36,7 +36,7 @@ LinkStatus PendingQueue::flushPending(const RedisEncodedResponse &msg) {
conn->writer.send(std::move(pending.front().rawResp.val));
}
else {
conn->writer.send(std::string(msg.val));
conn->writer.send(Formatter::multiply(msg, pending.front().tx.expectedResponses() ).val);
}
}
pending.pop();
......
......@@ -273,7 +273,7 @@ LinkStatus RaftDispatcher::service(Connection *conn, Transaction &tx) {
RaftStateSnapshotPtr snapshot = state.getSnapshot();
if(snapshot->status != RaftStatus::LEADER) {
if(snapshot->leader.empty()) {
return conn->err("unavailable");
return conn->raw(Formatter::multiply(Formatter::err("unavailable"), tx.expectedResponses()));
}
if(conn->raftStaleReads && !tx.containsWrites()) {
......@@ -282,7 +282,7 @@ LinkStatus RaftDispatcher::service(Connection *conn, Transaction &tx) {
}
// Redirect.
return conn->moved(0, snapshot->leader);
return conn->raw(Formatter::multiply(Formatter::moved(0, snapshot->leader), tx.expectedResponses()));
}
// What happens if I was just elected as leader, but my state machine is
......
......@@ -623,7 +623,21 @@ TEST_F(Raft_e2e, test_many_redis_commands) {
ASSERT_REPLY(l13, "OK");
ASSERT_REPLY(l14, "ACQUIRED");
// Ensure the followers return the correct number of responses on MOVED for
// pipelined writes.
int follower1 = (leaderID+1) % 3;
std::vector<std::future<redisReplyPtr>> moved;
for(size_t i = 0; i < 10; i++) {
moved.emplace_back(tunnel(follower1)->exec("set", "abc", "123"));
}
for(size_t i = 0; i < 10; i++) {
ASSERT_REPLY(moved[i], SSTR("MOVED 0 " << myself(leaderID).toString()));
}
// Make sure the connection did not hang.
ASSERT_REPLY(tunnel(follower1)->exec("ping", "zxcvbnm"), "zxcvbnm");
}
TEST_F(Raft_e2e, replication_with_trimmed_journal) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment