Commit f808307d authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Rename MultiOp to Transaction

parent 1483091b
Pipeline #417602 passed with stages
in 29 minutes and 53 seconds
......@@ -59,8 +59,8 @@ add_library(XrdQuarkDB SHARED
redis/Authenticator.cc redis/Authenticator.hh
redis/CommandMonitor.cc redis/CommandMonitor.hh
redis/MultiHandler.cc redis/MultiHandler.hh
redis/MultiOp.cc redis/MultiOp.hh
redis/RedisEncodedResponse.hh
redis/Transaction.cc redis/Transaction.hh
storage/ConsistencyScanner.cc storage/ConsistencyScanner.hh
storage/KeyConstants.cc storage/KeyConstants.hh
......
......@@ -92,8 +92,13 @@ struct cmdMapInit {
redis_cmd_map["exec"] = {RedisCommand::EXEC, CommandType::CONTROL};
redis_cmd_map["discard"] = {RedisCommand::DISCARD, CommandType::CONTROL};
redis_cmd_map["multi"] = {RedisCommand::MULTI, CommandType::CONTROL};
redis_cmd_map["multiop_read"] = {RedisCommand::MULTIOP_READ, CommandType::READ};
redis_cmd_map["multiop_readwrite"] = {RedisCommand::MULTIOP_READWRITE, CommandType::WRITE};
redis_cmd_map["tx_readonly"] = {RedisCommand::TX_READONLY, CommandType::READ};
redis_cmd_map["tx_readwrite"] = {RedisCommand::TX_READWRITE, CommandType::WRITE};
// These have been retained for compatibility, to ensure old raft journal
// entries can still be processed correctly. TODO: Remove after a couple of releases.
redis_cmd_map["multiop_read"] = {RedisCommand::TX_READONLY, CommandType::READ};
redis_cmd_map["multiop_readwrite"] = {RedisCommand::TX_READWRITE, CommandType::WRITE};
redis_cmd_map["raft_handshake"] = {RedisCommand::RAFT_HANDSHAKE, CommandType::RAFT};
redis_cmd_map["raft_append_entries"] = {RedisCommand::RAFT_APPEND_ENTRIES, CommandType::RAFT};
......
......@@ -21,8 +21,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef __QUARKDB_COMMANDS_H__
#define __QUARKDB_COMMANDS_H__
#ifndef QUARKDB_COMMANDS_H
#define QUARKDB_COMMANDS_H
#include <map>
......@@ -95,8 +95,8 @@ enum class RedisCommand {
EXEC,
DISCARD,
MULTI,
MULTIOP_READ,
MULTIOP_READWRITE,
TX_READONLY,
TX_READWRITE,
CONFIG_GET,
CONFIG_SET,
......@@ -136,7 +136,6 @@ enum class CommandType {
READ,
WRITE,
MULTIOP,
CONTROL,
RAFT,
QUARKDB,
......
......@@ -237,7 +237,7 @@ LinkStatus Connection::processRequests(Dispatcher *dispatcher, const InFlightTra
continue;
}
if(currentRequest.getCommand() == RedisCommand::MULTIOP_READWRITE) {
if(currentRequest.getCommand() == RedisCommand::TX_READWRITE) {
multiHandler.finalizePhantomTransaction(dispatcher, this);
dispatcher->dispatch(this, currentRequest);
continue;
......
......@@ -24,7 +24,7 @@
#include "storage/StagingArea.hh"
#include "utils/CommandParsing.hh"
#include "utils/ParseUtils.hh"
#include "redis/MultiOp.hh"
#include "redis/Transaction.hh"
#include "redis/ArrayResponseBuilder.hh"
#include "StateMachine.hh"
#include "Dispatcher.hh"
......@@ -59,19 +59,19 @@ RedisEncodedResponse RedisDispatcher::dispatchingError(RedisRequest &request, Lo
return Formatter::err(msg);
}
RedisEncodedResponse RedisDispatcher::dispatch(MultiOp &multiOp, LogIndex commit) {
StagingArea stagingArea(store, !multiOp.containsWrites());
ArrayResponseBuilder builder(multiOp.size(), multiOp.isPhantom());
RedisEncodedResponse RedisDispatcher::dispatch(Transaction &transaction, LogIndex commit) {
StagingArea stagingArea(store, !transaction.containsWrites());
ArrayResponseBuilder builder(transaction.size(), transaction.isPhantom());
for(size_t i = 0; i < multiOp.size(); i++) {
builder.push_back(dispatchReadWrite(stagingArea, multiOp[i]));
for(size_t i = 0; i < transaction.size(); i++) {
builder.push_back(dispatchReadWrite(stagingArea, transaction[i]));
}
if(multiOp.containsWrites()) {
if(transaction.containsWrites()) {
stagingArea.commit(commit);
}
store.getRequestCounter().account(multiOp);
store.getRequestCounter().account(transaction);
return builder.buildResponse();
}
......@@ -554,26 +554,26 @@ RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, Red
}
}
RedisEncodedResponse RedisDispatcher::handleMultiOp(RedisRequest &request, LogIndex commit) {
MultiOp multiOp;
RedisEncodedResponse RedisDispatcher::handleTransaction(RedisRequest &request, LogIndex commit) {
Transaction transaction;
qdb_assert(request.size() == 3);
qdb_assert(multiOp.deserialize(request[1]));
qdb_assert(request.getCommand() == RedisCommand::MULTIOP_READ || request.getCommand() == RedisCommand::MULTIOP_READWRITE);
qdb_assert(transaction.deserialize(request[1]));
qdb_assert(request.getCommand() == RedisCommand::TX_READONLY || request.getCommand() == RedisCommand::TX_READWRITE);
if(request.getCommand() == RedisCommand::MULTIOP_READ) {
qdb_assert(!multiOp.containsWrites());
if(request.getCommand() == RedisCommand::TX_READONLY) {
qdb_assert(!transaction.containsWrites());
}
else {
qdb_assert(multiOp.containsWrites());
qdb_assert(transaction.containsWrites());
}
qdb_assert(request[2] == "phantom" || request[2] == "real");
bool phantom = false;
if(request[2] == "phantom") phantom = true;
multiOp.setPhantom(phantom);
transaction.setPhantom(phantom);
return dispatch(multiOp, commit);
return dispatch(transaction, commit);
}
RedisEncodedResponse RedisDispatcher::dispatch(RedisRequest &request, LogIndex commit) {
......@@ -599,9 +599,9 @@ RedisEncodedResponse RedisDispatcher::dispatch(RedisRequest &request, LogIndex c
return dispatchingError(request, commit);
}
// MultiOp, encoded as single RedisRequest?
if(request.getCommand() == RedisCommand::MULTIOP_READ || request.getCommand() == RedisCommand::MULTIOP_READWRITE) {
return handleMultiOp(request, commit);
// Transaction, encoded as single RedisRequest?
if(request.getCommand() == RedisCommand::TX_READONLY || request.getCommand() == RedisCommand::TX_READWRITE) {
return handleTransaction(request, commit);
}
StagingArea stagingArea(store, request.getCommandType() == CommandType::READ);
......
......@@ -31,7 +31,7 @@
namespace quarkdb {
class MultiOp;
class Transaction;
class Dispatcher {
public:
......@@ -50,9 +50,9 @@ public:
virtual LinkStatus dispatch(Connection *conn, RedisRequest &req) override final;
RedisEncodedResponse dispatch(RedisRequest &req, LogIndex commit);
RedisEncodedResponse dispatch(MultiOp &multiOp, LogIndex commit);
RedisEncodedResponse dispatch(Transaction &transaction, LogIndex commit);
private:
RedisEncodedResponse handleMultiOp(RedisRequest &req, LogIndex commit);
RedisEncodedResponse handleTransaction(RedisRequest &req, LogIndex commit);
RedisEncodedResponse dispatchReadWrite(StagingArea &stagingArea, RedisRequest &req);
RedisEncodedResponse dispatchRead(StagingArea &stagingArea, RedisRequest &req);
RedisEncodedResponse dispatchWrite(StagingArea &stagingArea, RedisRequest &req);
......
......@@ -26,7 +26,7 @@
#include "Common.hh"
#include "Formatter.hh"
#include "redis/ArrayResponseBuilder.hh"
#include "redis/MultiOp.hh"
#include "redis/Transaction.hh"
using namespace quarkdb;
RedisEncodedResponse Formatter::moved(int64_t shardId, const RaftServer &location) {
......@@ -115,15 +115,15 @@ RedisEncodedResponse Formatter::simpleRedisRequest(const RedisRequest &req) {
}
RedisEncodedResponse Formatter::redisRequest(const RedisRequest &req) {
if(req.getCommand() == RedisCommand::MULTIOP_READWRITE || req.getCommand() == RedisCommand::MULTIOP_READ) {
MultiOp multiOp;
multiOp.deserialize(req[1]);
if(req.getCommand() == RedisCommand::TX_READWRITE || req.getCommand() == RedisCommand::TX_READONLY) {
Transaction transaction;
transaction.deserialize(req[1]);
ArrayResponseBuilder builder(multiOp.size() + 1);
ArrayResponseBuilder builder(transaction.size() + 1);
builder.push_back(Formatter::string(req[0]));
for(size_t i = 0; i < multiOp.size(); i++) {
builder.push_back(simpleRedisRequest(multiOp[i]));
for(size_t i = 0; i < transaction.size(); i++) {
builder.push_back(simpleRedisRequest(transaction[i]));
}
return builder.buildResponse();
......
......@@ -45,7 +45,7 @@ struct QuarkDBInfo {
std::vector<std::string> toVector() const;
};
class Shard; class ShardDirectory; class MultiOp;
class Shard; class ShardDirectory;
class QuarkDBNode : public Dispatcher {
public:
......
......@@ -22,7 +22,7 @@
************************************************************************/
#include "RedisRequest.hh"
#include "redis/MultiOp.hh"
#include "redis/Transaction.hh"
#include "utils/StringUtils.hh"
#include "utils/Macros.hh"
using namespace quarkdb;
......@@ -45,10 +45,10 @@ void RedisRequest::parseCommand() {
}
std::string RedisRequest::toPrintableString() const {
if(this->getCommand() == RedisCommand::MULTIOP_READ || this->getCommand() == RedisCommand::MULTIOP_READWRITE) {
MultiOp multiOp;
multiOp.fromRedisRequest(*this);
return multiOp.toPrintableString();
if(this->getCommand() == RedisCommand::TX_READONLY || this->getCommand() == RedisCommand::TX_READWRITE) {
Transaction transaction;
transaction.fromRedisRequest(*this);
return transaction.toPrintableString();
}
std::stringstream ss;
......
......@@ -26,7 +26,6 @@
#include "ShardDirectory.hh"
#include "raft/RaftGroup.hh"
#include "raft/RaftDispatcher.hh"
#include "redis/MultiOp.hh"
#include "utils/ScopedAdder.hh"
using namespace quarkdb;
......
......@@ -23,7 +23,6 @@
#include "RecoveryDispatcher.hh"
#include "../Formatter.hh"
#include "../redis/MultiOp.hh"
using namespace quarkdb;
RecoveryDispatcher::RecoveryDispatcher(RecoveryEditor &ed) : editor(ed) {
......
......@@ -30,8 +30,6 @@
namespace quarkdb {
class MultiOp;
class RecoveryDispatcher : public Dispatcher {
public:
RecoveryDispatcher(RecoveryEditor &editor);
......
......@@ -55,9 +55,9 @@ void CommandMonitor::broadcast(const std::string& linkDescription, const RedisRe
return broadcast(linkDescription, req.toPrintableString());
}
void CommandMonitor::broadcast(const std::string& linkDescription, const MultiOp& multiOp) {
void CommandMonitor::broadcast(const std::string& linkDescription, const Transaction& transaction) {
if(!active) return;
return broadcast(linkDescription, multiOp.toPrintableString());
return broadcast(linkDescription, transaction.toPrintableString());
}
void CommandMonitor::addRegistration(Connection *c) {
......
......@@ -29,21 +29,21 @@
namespace quarkdb {
class MultiOp;
class Transaction;
class CommandMonitor {
public:
CommandMonitor();
void broadcast(const std::string& linkDescription, const RedisRequest& received);
void broadcast(const std::string& linkDescription, const MultiOp& multiOp);
void broadcast(const std::string& linkDescription, const Transaction& transaction);
void addRegistration(Connection *c);
size_t size();
private:
void broadcast(const std::string& linkDescription, const std::string& printableString);
std::atomic<int64_t> active {false};
std::mutex mtx;
......
......@@ -36,11 +36,11 @@ bool MultiHandler::active() const {
void MultiHandler::activatePhantom() {
if(activated) {
qdb_assert(multiOp.isPhantom());
qdb_assert(transaction.isPhantom());
}
else {
activated = true;
multiOp.setPhantom(true);
transaction.setPhantom(true);
}
}
......@@ -48,7 +48,7 @@ LinkStatus MultiHandler::process(Dispatcher *dispatcher, Connection *conn, Redis
qdb_assert(activated || req.getCommand() == RedisCommand::MULTI);
if(req.getCommand() == RedisCommand::DISCARD) {
multiOp.clear();
transaction.clear();
activated = false;
return conn->ok();
}
......@@ -63,21 +63,21 @@ LinkStatus MultiHandler::process(Dispatcher *dispatcher, Connection *conn, Redis
}
activated = true;
multiOp.setPhantom(false);
transaction.setPhantom(false);
return conn->ok();
}
if(req.getCommand() == RedisCommand::EXEC) {
// Empty multi-exec block?
if(multiOp.empty()) {
qdb_assert(!multiOp.isPhantom());
if(transaction.empty()) {
qdb_assert(!transaction.isPhantom());
activated = false;
return conn->vector( {} );
}
RedisRequest fused = multiOp.toRedisRequest();
RedisRequest fused = transaction.toRedisRequest();
multiOp.clear();
transaction.clear();
activated = false;
return dispatcher->dispatch(conn, fused);
......@@ -88,8 +88,8 @@ LinkStatus MultiHandler::process(Dispatcher *dispatcher, Connection *conn, Redis
}
// Queue
multiOp.push_back(req);
if(!multiOp.isPhantom()) {
transaction.push_back(req);
if(!transaction.isPhantom()) {
return conn->status("QUEUED");
}
......@@ -97,13 +97,13 @@ LinkStatus MultiHandler::process(Dispatcher *dispatcher, Connection *conn, Redis
}
LinkStatus MultiHandler::finalizePhantomTransaction(Dispatcher *dispatcher, Connection *conn) {
if(!activated || !multiOp.isPhantom()) return 0;
if(multiOp.empty()) return 0;
if(!activated || !transaction.isPhantom()) return 0;
if(transaction.empty()) return 0;
RedisRequest req {"EXEC"};
return process(dispatcher, conn, req);
}
size_t MultiHandler::size() const {
return multiOp.size();
return transaction.size();
}
......@@ -24,7 +24,7 @@
#ifndef QUARKDB_REDIS_MULTIHANDLER_H
#define QUARKDB_REDIS_MULTIHANDLER_H
#include "MultiOp.hh"
#include "Transaction.hh"
#include "RedisEncodedResponse.hh"
namespace quarkdb {
......@@ -40,11 +40,11 @@ public:
LinkStatus process(Dispatcher *dispatcher, Connection *conn, RedisRequest &req);
void activatePhantom();
size_t size() const;
bool isPhantom() const { return activated && multiOp.isPhantom(); }
bool isPhantom() const { return activated && transaction.isPhantom(); }
LinkStatus finalizePhantomTransaction(Dispatcher *dispatcher, Connection *conn);
private:
MultiOp multiOp;
Transaction transaction;
bool activated = false;
};
......
// ----------------------------------------------------------------------
// File: MultiOp.cc
// File: Transaction.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
......@@ -21,16 +21,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "MultiOp.hh"
#include "Transaction.hh"
#include "../utils/Macros.hh"
#include "../utils/IntToBinaryString.hh"
using namespace quarkdb;
MultiOp::MultiOp() {}
Transaction::Transaction() {}
MultiOp::~MultiOp() {}
Transaction::~Transaction() {}
void MultiOp::push_back(const RedisRequest &req) {
void Transaction::push_back(const RedisRequest &req) {
requests.push_back(req);
checkLastCommandForWrites();
}
......@@ -43,7 +43,7 @@ void serializeRequestToString(std::stringstream &ss, const RedisRequest &req) {
}
}
std::string MultiOp::serialize() const {
std::string Transaction::serialize() const {
std::stringstream ss;
ss << intToBinaryString(requests.size());
......@@ -54,7 +54,7 @@ std::string MultiOp::serialize() const {
return ss.str();
}
void MultiOp::checkLastCommandForWrites() {
void Transaction::checkLastCommandForWrites() {
RedisRequest &lastreq = requests.back();
qdb_assert(lastreq.getCommandType() == CommandType::READ || lastreq.getCommandType() == CommandType::WRITE);
......@@ -63,7 +63,7 @@ void MultiOp::checkLastCommandForWrites() {
}
}
bool MultiOp::deserialize(const std::string &src) {
bool Transaction::deserialize(const std::string &src) {
qdb_assert(requests.empty());
if(src.empty()) return false;
......@@ -90,19 +90,19 @@ bool MultiOp::deserialize(const std::string &src) {
return true;
}
void MultiOp::clear() {
void Transaction::clear() {
requests.clear();
}
std::string MultiOp::getFusedCommand() const {
std::string Transaction::getFusedCommand() const {
if(hasWrites) {
return "MULTIOP_READWRITE";
return "TX_READWRITE";
}
return "MULTIOP_READ";
return "TX_READONLY";
}
RedisRequest MultiOp::toRedisRequest() const {
RedisRequest Transaction::toRedisRequest() const {
if(phantom && requests.size() == 1) {
return requests[0];
}
......@@ -121,8 +121,8 @@ RedisRequest MultiOp::toRedisRequest() const {
return req;
}
void MultiOp::fromRedisRequest(const RedisRequest &req) {
qdb_assert(req.getCommand() == RedisCommand::MULTIOP_READ || req.getCommand() == RedisCommand::MULTIOP_READWRITE);
void Transaction::fromRedisRequest(const RedisRequest &req) {
qdb_assert(req.getCommand() == RedisCommand::TX_READONLY || req.getCommand() == RedisCommand::TX_READWRITE);
qdb_assert(req.size() == 3);
qdb_assert(deserialize(req[1]));
......@@ -138,16 +138,16 @@ void MultiOp::fromRedisRequest(const RedisRequest &req) {
}
std::string MultiOp::multiOpTypeInString() const {
std::string Transaction::typeInString() const {
if(phantom) {
return "phantom";
}
return "real";
}
std::string MultiOp::toPrintableString() const {
std::string Transaction::toPrintableString() const {
std::stringstream ss;
ss << getFusedCommand() << " (" << multiOpTypeInString() << "), size " << requests.size() << std::endl;
ss << getFusedCommand() << " (" << typeInString() << "), size " << requests.size() << std::endl;
for(size_t i = 0; i < requests.size(); i++) {
ss << " --- " << i+1 << ") " << requests[i].toPrintableString();
if(i != requests.size()-1) ss << std::endl;
......
// ----------------------------------------------------------------------
// File: MultiOp.hh
// File: Transaction.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
......@@ -21,17 +21,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QUARKDB_REDIS_MULTIOP_H
#define QUARKDB_REDIS_MULTIOP_H
#ifndef QUARKDB_REDIS_TRANSACTION_H
#define QUARKDB_REDIS_TRANSACTION_H
#include "../RedisRequest.hh"
namespace quarkdb {
class MultiOp {
class Transaction {
public:
MultiOp();
~MultiOp();
Transaction();
~Transaction();
void push_back(const RedisRequest &req);
bool containsWrites() const {
......@@ -49,7 +49,7 @@ public:
return requests[i];
}
bool operator==(const MultiOp &rhs) const {
bool operator==(const Transaction &rhs) const {
return requests == rhs.requests;
}
......@@ -83,7 +83,7 @@ public:
std::string toPrintableString() const;
//----------------------------------------------------------------------------
// How many responses is the client to this MultiOp expecting?
// How many responses is the client to this transaction expecting?
// - size() if this is a phantom transaction. The client cannot possibly
// know we're batching the requests in the background, and will be utterly
// confused if we provide fewer responses than actual requests sent.
......@@ -104,7 +104,7 @@ private:
bool hasWrites = false;
bool phantom = false;
std::vector<RedisRequest> requests;
std::string multiOpTypeInString() const;
std::string typeInString() const;
};
}
......
......@@ -24,7 +24,7 @@
#include "../Utils.hh"
#include "RequestCounter.hh"
#include "../Commands.hh"
#include "../redis/MultiOp.hh"
#include "../redis/Transaction.hh"
using namespace quarkdb;
RequestCounter::RequestCounter(std::chrono::seconds intv)
......@@ -39,11 +39,11 @@ void RequestCounter::account(const RedisRequest &req) {
}
}
void RequestCounter::account(const MultiOp &multiOp) {
void RequestCounter::account(const Transaction &transaction) {
batches++;
for(size_t i = 0; i < multiOp.size(); i++) {
account(multiOp[i]);
for(size_t i = 0; i < transaction.size(); i++) {
account(transaction[i]);
}
}
......
......@@ -29,7 +29,7 @@
namespace quarkdb {
class MultiOp;
class Transaction;
class RedisRequest;
//------------------------------------------------------------------------------
......@@ -42,7 +42,7 @@ public:
RequestCounter(std::chrono::seconds interval);
void account(const RedisRequest &req);
void account(const MultiOp &multiOp);
void account(const Transaction &transaction);
void mainThread(ThreadAssistant &assistant);
void setReportingStatus(bool val);
......