Commit f9864aef authored by Georgios Bitzes's avatar Georgios Bitzes
Browse files

Implement redis commands hsetnx and hincrbyfloat

parent 4ebf1e6c
......@@ -53,7 +53,9 @@ struct cmdMapInit {
redis_cmd_map["set"] = {RedisCommand::SET, CommandType::WRITE};
redis_cmd_map["del"] = {RedisCommand::DEL, CommandType::WRITE};
redis_cmd_map["hset"] = {RedisCommand::HSET, CommandType::WRITE};
redis_cmd_map["hsetnx"] = {RedisCommand::HSETNX, CommandType::WRITE};
redis_cmd_map["hincrby"] = {RedisCommand::HINCRBY, CommandType::WRITE};
redis_cmd_map["hincrbyfloat"] = {RedisCommand::HINCRBYFLOAT, CommandType::WRITE};
redis_cmd_map["hdel"] = {RedisCommand::HDEL, CommandType::WRITE};
redis_cmd_map["sadd"] = {RedisCommand::SADD, CommandType::WRITE};
redis_cmd_map["srem"] = {RedisCommand::SREM, CommandType::WRITE};
......
......@@ -50,6 +50,8 @@ enum class RedisCommand {
HLEN,
HVALS,
HSCAN,
HSETNX,
HINCRBYFLOAT,
SADD,
SISMEMBER,
......
......@@ -121,6 +121,12 @@ LinkStatus RedisDispatcher::dispatch(Connection *conn, RedisRequest &request, Re
if(existed.ok()) return conn->integer(0);
return conn->integer(1);
}
case RedisCommand::HSETNX: {
if(request.size() != 4) return errArgs(conn, request, commit);
bool outcome = store.hsetnx(request[1], request[2], request[3], commit);
return conn->integer(outcome);
}
case RedisCommand::HEXISTS: {
if(request.size() != 3) return errArgs(conn, request, commit);
rocksdb::Status st = store.hexists(request[1], request[2]);
......@@ -149,6 +155,13 @@ LinkStatus RedisDispatcher::dispatch(Connection *conn, RedisRequest &request, Re
if(!st.ok()) return conn->fromStatus(st);
return conn->integer(ret);
}
case RedisCommand::HINCRBYFLOAT: {
if(request.size() != 4) return errArgs(conn, request, commit);
double ret = 0;
rocksdb::Status st = store.hincrbyfloat(request[1], request[2], request[3], ret, commit);
if(!st.ok()) return conn->fromStatus(st);
return conn->string(std::to_string(ret));
}
case RedisCommand::HDEL: {
if(request.size() <= 2) return errArgs(conn, request, commit);
int64_t count = 0;
......
......@@ -59,7 +59,7 @@ std::string Formatter::integer(int64_t number) {
std::string Formatter::fromStatus(const rocksdb::Status &status) {
if(status.ok()) return Formatter::ok();
return Formatter::err(status.ToString());
return Formatter::err(SSTR("ERR " << status.ToString()));
}
std::string Formatter::vector(const std::vector<std::string> &vec) {
......
......@@ -189,6 +189,23 @@ rocksdb::Status RocksDB::hset(const std::string &key, const std::string &field,
return rocksdb::Status::OK();
}
bool RocksDB::hsetnx(const std::string &key, const std::string &field, const std::string &value, LogIndex index) {
std::string tkey = translate_key(kHash, key, field);
TransactionPtr tx = startTransaction();
std::string tmp;
rocksdb::Status st = tx->GetForUpdate(rocksdb::ReadOptions(), tkey, &tmp);
ASSERT_OK_OR_NOTFOUND(st);
if(st.IsNotFound()) {
THROW_ON_ERROR(tx->Put(tkey, value));
}
commitTransaction(tx, index);
return st.IsNotFound();
}
rocksdb::Status RocksDB::hincrby(const std::string &key, const std::string &field, const std::string &incrby, int64_t &result, LogIndex index) {
std::string tkey = translate_key(kHash, key, field);
......@@ -217,6 +234,34 @@ rocksdb::Status RocksDB::hincrby(const std::string &key, const std::string &fiel
return rocksdb::Status::OK();
}
rocksdb::Status RocksDB::hincrbyfloat(const std::string &key, const std::string &field, const std::string &incrby, double &result, LogIndex index) {
std::string tkey = translate_key(kHash, key, field);
TransactionPtr tx = startTransaction();
double incrbyDouble;
if(!my_strtod(incrby, incrbyDouble)) {
commitTransaction(tx, index);
return rocksdb::Status::InvalidArgument("value is not a float or out of range");
}
std::string value;
rocksdb::Status st = tx->GetForUpdate(rocksdb::ReadOptions(), tkey, &value);
ASSERT_OK_OR_NOTFOUND(st);
result = 0;
if(st.ok() && !my_strtod(value, result)) {
commitTransaction(tx, index);
return rocksdb::Status::InvalidArgument("hash value is not a float");
}
result += incrbyDouble;
THROW_ON_ERROR(tx->Put(tkey, std::to_string(result)));
commitTransaction(tx, index);
return rocksdb::Status::OK();
}
rocksdb::Status RocksDB::hdel(const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed, LogIndex index) {
removed = 0;
TransactionPtr tx = startTransaction();
......
......@@ -53,7 +53,9 @@ public:
rocksdb::Status hkeys(const std::string &key, std::vector<std::string> &keys);
rocksdb::Status hgetall(const std::string &key, std::vector<std::string> &res);
rocksdb::Status hset(const std::string &key, const std::string &field, const std::string &value, LogIndex index = 0);
bool hsetnx(const std::string &key, const std::string &field, const std::string &value, LogIndex index = 0);
rocksdb::Status hincrby(const std::string &key, const std::string &field, const std::string &incrby, int64_t &result, LogIndex index = 0);
rocksdb::Status hincrbyfloat(const std::string &key, const std::string &field, const std::string &incrby, double &result, LogIndex index = 0);
rocksdb::Status hdel(const std::string &key, const VecIterator &start, const VecIterator &end, int64_t &removed, LogIndex index = 0);
rocksdb::Status hlen(const std::string &key, size_t &len);
rocksdb::Status hscan(const std::string &key, const std::string &cursor, size_t count, std::string &newcursor, std::vector<std::string> &results);
......
......@@ -25,6 +25,7 @@
#include <endian.h>
#include "Utils.hh"
#include <memory.h>
#include <math.h>
#include <sys/stat.h>
namespace quarkdb {
......@@ -55,6 +56,16 @@ bool my_strtoll(const std::string &str, int64_t &ret) {
return true;
}
bool my_strtod(const std::string &str, double &ret) {
char *endptr = NULL;
ret = strtod(str.c_str(), &endptr);
if(endptr != str.c_str() + str.size() || ret == HUGE_VAL || ret == -HUGE_VAL) {
return false;
}
return true;
}
int64_t binaryStringToInt(const char* buff) {
int64_t result;
memcpy(&result, buff, sizeof(result));
......
......@@ -75,6 +75,7 @@ std::string chopPath(const std::string &path);
bool mkpath(const std::string &path, mode_t mode, std::string &err);
void mkpath_or_die(const std::string &path, mode_t mode);
bool my_strtoll(const std::string &str, int64_t &ret);
bool my_strtod(const std::string &str, double &ret);
std::vector<std::string> split(std::string data, std::string token);
bool startswith(const std::string &str, const std::string &prefix);
bool parseServer(const std::string &str, RaftServer &srv);
......
......@@ -115,6 +115,45 @@ TEST_F(Rocks_DB, test_hincrby) {
ASSERT_EQ(result, -24);
}
TEST_F(Rocks_DB, test_hsetnx) {
ASSERT_EQ(rocksdb()->getLastApplied(), 0);
ASSERT_TRUE(rocksdb()->hsetnx("myhash", "field", "v1", 1));
ASSERT_EQ(rocksdb()->getLastApplied(), 1);
ASSERT_FALSE(rocksdb()->hsetnx("myhash", "field", "v2", 2));
ASSERT_EQ(rocksdb()->getLastApplied(), 2);
std::string value;
ASSERT_OK(rocksdb()->hget("myhash", "field", value));
ASSERT_EQ(value, "v1");
}
TEST_F(Rocks_DB, test_hincrbyfloat) {
ASSERT_EQ(rocksdb()->getLastApplied(), 0);
double result;
ASSERT_OK(rocksdb()->hincrbyfloat("myhash", "field", "0.5", result, 1));
ASSERT_EQ(rocksdb()->getLastApplied(), 1);
ASSERT_EQ(result, 0.5);
std::string tmp;
ASSERT_OK(rocksdb()->hget("myhash", "field", tmp));
ASSERT_EQ(tmp, "0.500000");
ASSERT_OK(rocksdb()->hincrbyfloat("myhash", "field", "0.3", result, 2));
ASSERT_EQ(rocksdb()->getLastApplied(), 2);
ASSERT_OK(rocksdb()->hget("myhash", "field", tmp));
ASSERT_EQ(tmp, "0.800000");
ASSERT_EQ(result, 0.8);
ASSERT_OK(rocksdb()->hset("myhash", "field2", "not-a-float", 3));
rocksdb::Status st = rocksdb()->hincrbyfloat("myhash", "field2", "0.1", result, 4);
ASSERT_EQ(st.ToString(), "Invalid argument: hash value is not a float");
ASSERT_EQ(rocksdb()->getLastApplied(), 4);
}
TEST_F(Rocks_DB, basic_sanity) {
std::string buffer;
std::vector<std::string> vec, vec2;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment