Commits (26)
stages:
- build
fedora-29:
fedora-31:
stage: build
image: fedora:29
image: fedora:31
script:
- dnf install -y git python rpm-build which dnf-plugins-core
- ci/gitlab-build.sh
- mkdir ${CI_JOB_NAME}
- cp -r /root/rpmbuild/RPMS build/SRPMS ${CI_JOB_NAME}
artifacts:
paths:
- "$CI_JOB_NAME"
fedora-30:
stage: build
image: fedora:30
script:
- dnf install -y git python rpm-build which dnf-plugins-core
- dnf install -y git python rpm-build which dnf-plugins-core libuuid-devel
- ci/gitlab-build.sh
- mkdir ${CI_JOB_NAME}
- cp -r /root/rpmbuild/RPMS build/SRPMS ${CI_JOB_NAME}
......
......@@ -16,6 +16,7 @@ option(PACKAGEONLY "Build without dependencies" OFF)
if (NOT PACKAGEONLY)
find_package(GTest)
find_package(OpenSSL REQUIRED)
find_package(uuid REQUIRED)
else ()
message(STATUS "Runing CMake in package only mode.")
endif()
......@@ -78,6 +79,10 @@ add_library(qclient STATIC
src/reader/reader.cc
src/reader/sds.cc
src/shared/BinarySerializer.cc
src/shared/Communicator.cc
src/shared/CommunicatorListener.cc
src/shared/PendingRequestVault.cc
src/shared/SharedDeque.cc
src/shared/SharedHash.cc
src/shared/SharedManager.cc
......@@ -114,12 +119,14 @@ if (ATOMIC_FOUND)
target_link_libraries(qclient PUBLIC
fmt
ATOMIC::ATOMIC
${UUID_LIBRARIES}
${OPENSSL_LIBRARIES}
${FOLLY_LIBRARIES})
else ()
target_link_libraries(qclient PUBLIC
target_link_libraries(qclient PUBLIC
fmt
${UUID_LIBRARIES}
${OPENSSL_LIBRARIES}
${FOLLY_LIBRARIES})
endif()
......
# Try to find uuid
# Once done, this will define
#
# UUID_FOUND - system has uuid
# UUID_INCLUDE_DIRS - uuid include directories
# UUID_LIBRARIES - libraries needed to use uuid
include(FindPackageHandleStandardArgs)
if(UUID_INCLUDE_DIRS AND UUID_LIBRARIES)
set(UUID_FIND_QUIETLY TRUE)
else()
find_path(
UUID_INCLUDE_DIR
NAMES uuid.h
HINTS ${UUID_ROOT_DIR}
PATH_SUFFIXES include uuid)
find_library(
UUID_LIBRARY
NAMES uuid
HINTS ${UUID_ROOT_DIR}
PATH_SUFFIXES ${LIBRARY_PATH_PREFIX})
set(UUID_INCLUDE_DIRS ${UUID_INCLUDE_DIR})
set(UUID_LIBRARIES ${UUID_LIBRARY})
find_package_handle_standard_args(
uuid DEFAULT_MSG UUID_LIBRARY UUID_INCLUDE_DIR)
mark_as_advanced(UUID_INCLUDE_DIR UUID_LIBRARY)
endif()
#!/usr/bin/env python
#!/usr/bin/env python3
################################################################################
## Script to generate version numbers from git tags. ##
......@@ -45,10 +45,7 @@ class SoftwareVersion:
if self.patch == None: assert self.miniPatch == None
def toString(self):
ret = "{0}.{1}".format(self.major, self.minor)
if self.patch or self.miniPatch:
ret += ".{0}".format(self.patch)
ret = "{0}.{1}.{2}".format(self.major, self.minor, self.patch)
if self.miniPatch:
ret += ".{0}".format(self.miniPatch)
......@@ -156,7 +153,7 @@ def applyTemplate(templateContent, replacements):
return newContent
def sh(cmd):
return subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT).decode(sys.stdout.encoding)
def getFile(filename):
try:
......
//------------------------------------------------------------------------------
// File: LastNMap.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_LAST_N_MAP_HH
#define QCLIENT_LAST_N_MAP_HH
#include "RingBuffer.hh"
#include <map>
#include <mutex>
namespace qclient {
//------------------------------------------------------------------------------
// A simple data structure to hold the "last N" elements put into it.
// Thread-safe.
//------------------------------------------------------------------------------
template<typename K, typename V>
class LastNMap {
public:
//----------------------------------------------------------------------------
// A simple data structure to hold the "last N" elements put into it.
// Thread-safe.
//----------------------------------------------------------------------------
LastNMap(size_t n) : mRingBuffer(n) {}
//----------------------------------------------------------------------------
// Does the given element exist?
//----------------------------------------------------------------------------
bool query(const K& key, V& out) const {
std::unique_lock<std::mutex> lock(mMutex);
auto it = mContents.find(key);
if(it == mContents.end()) {
return false;
}
out = it->second.value;
return true;
}
//----------------------------------------------------------------------------
// Emplace
//----------------------------------------------------------------------------
void insert(const K &k, const V &v) {
std::unique_lock<std::mutex> lock(mMutex);
if(mRingBuffer.hasRolledOver()) {
auto it = mContents.find(mRingBuffer.getNextToEvict());
if(it != mContents.end()) {
it->second.count--;
if(it->second.count == 0) {
mContents.erase(it);
}
}
}
mRingBuffer.emplace_back(k);
mContents[k].count++;
mContents[k].value = v;
}
private:
struct InternalItem {
uint32_t count = 0;
V value;
};
RingBuffer<K> mRingBuffer;
std::map<K, InternalItem> mContents;
mutable std::mutex mMutex;
};
}
#endif
//------------------------------------------------------------------------------
// File: LastNSet.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_LAST_N_SET_HH
#define QCLIENT_LAST_N_SET_HH
#include "RingBuffer.hh"
#include <map>
#include <mutex>
namespace qclient {
//------------------------------------------------------------------------------
// A simple data structure to hold the "last N" elements put into it.
// Thread-safe.
//------------------------------------------------------------------------------
template<typename T>
class LastNSet {
public:
//----------------------------------------------------------------------------
// A simple data structure to hold the "last N" elements put into it.
// Thread-safe.
//----------------------------------------------------------------------------
LastNSet(size_t n) : mRingBuffer(n) {}
//----------------------------------------------------------------------------
// Does the given element exist in the set?
//----------------------------------------------------------------------------
bool query(const T& elem) const {
std::unique_lock<std::mutex> lock(mMutex);
return mSet.find(elem) != mSet.end();
}
//----------------------------------------------------------------------------
// Emplace
//----------------------------------------------------------------------------
template<typename... Args>
void emplace(Args&&... args) {
std::unique_lock<std::mutex> lock(mMutex);
if(mRingBuffer.hasRolledOver()) {
auto it = mSet.find(mRingBuffer.getNextToEvict());
if(it != mSet.end()) {
it->second--;
if(it->second == 0) {
mSet.erase(it);
}
}
}
T item = T(std::forward<Args>(args)...);
mRingBuffer.emplace_back(item);
mSet[item]++;
}
private:
RingBuffer<T> mRingBuffer;
std::map<T, uint32_t> mSet;
mutable std::mutex mMutex;
};
}
#endif
//------------------------------------------------------------------------------
// File: RingBuffer.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_RING_BUFFER_HH
#define QCLIENT_RING_BUFFER_HH
#include <vector>
namespace qclient {
//------------------------------------------------------------------------------
// A simple, fixed-sized ring buffer. Not thread-safe!
//------------------------------------------------------------------------------
template<typename T>
class RingBuffer {
public:
//----------------------------------------------------------------------------
// Constructor
//----------------------------------------------------------------------------
RingBuffer(size_t n) {
mRing.resize(n);
mNextToEvict = 0;
mRollover = false;
}
//----------------------------------------------------------------------------
// Emplace back
//----------------------------------------------------------------------------
template<typename... Args>
void emplace_back(Args&&... args) {
mRing[mNextToEvict] = T(std::forward<Args>(args)...);
mNextToEvict++;
if(mNextToEvict >= mRing.size()) {
mNextToEvict = 0;
mRollover = true;
}
}
//----------------------------------------------------------------------------
// Access next-to-evict
//----------------------------------------------------------------------------
T& getNextToEvict() {
return mRing[mNextToEvict];
}
//----------------------------------------------------------------------------
// Has rolled over?
//----------------------------------------------------------------------------
bool hasRolledOver() const {
return mRollover;
}
private:
std::vector<T> mRing;
size_t mNextToEvict;
bool mRollover;
};
}
#endif
//------------------------------------------------------------------------------
// File: Communicator.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_SHARED_COMMUNICATOR_HH
#define QCLIENT_SHARED_COMMUNICATOR_HH
#include "qclient/shared/PendingRequestVault.hh"
#include "qclient/AssistedThread.hh"
namespace qclient {
class Subscriber;
class Subscription;
class QClient;
class Message;
class SteadyClock;
//------------------------------------------------------------------------------
// Convenience class for point-to-point request / response messaging between
// two clients with QuarkDB acting as the middleman.
//
// Implements proper retries, backoff, and timeouts. Requires an ACK from
// the other side, which can optionally send a status code and a message.
//
// We need this for legacy reasons, if you're designing a system from scratch
// I'm not sure how reasonable doing this would be. It could be useful for
// very infrequent messages.
//
// For high volume messages, direct point-to-point with a TCP connection would
// always be better than this contraption.
//
// The Communicator class is used for sending messages only. To receive them
// from the other side, use CommunicatorListener.
//------------------------------------------------------------------------------
class Communicator {
public:
//----------------------------------------------------------------------------
// Convenience class for point-to-point request / response messaging
//----------------------------------------------------------------------------
Communicator(Subscriber* subscriber, const std::string &channel, SteadyClock* clock = nullptr,
std::chrono::milliseconds mRetryInterval = std::chrono::seconds(10),
std::chrono::seconds mHardDeadline = std::chrono::seconds(60)
);
//----------------------------------------------------------------------------
// Destructor
//----------------------------------------------------------------------------
~Communicator();
//----------------------------------------------------------------------------
// Issue a request on the given channel, retrieve assigned ID
//----------------------------------------------------------------------------
std::future<CommunicatorReply> issue(const std::string &contents, std::string &id);
//----------------------------------------------------------------------------
// Issue a request on the given channel
//----------------------------------------------------------------------------
std::future<CommunicatorReply> issue(const std::string &contents);
//----------------------------------------------------------------------------
// Run next-to-retry pass
//
// Return value:
// - False: Nothing to retry
// - True: We have something to retry
//----------------------------------------------------------------------------
bool runNextToRetry(std::string &channel, std::string &contents, std::string &id);
//----------------------------------------------------------------------------
// Get time to sleep until next retry
//----------------------------------------------------------------------------
std::chrono::milliseconds getSleepUntilRetry() const;
private:
//----------------------------------------------------------------------------
// Cleanup and retry thread
//----------------------------------------------------------------------------
void backgroundThread(ThreadAssistant &assistant);
//----------------------------------------------------------------------------
// Process incoming message
//----------------------------------------------------------------------------
void processIncoming(Message &&msg);
Subscriber* mSubscriber;
std::string mChannel;
SteadyClock *mClock;
QClient* mQcl;
PendingRequestVault mPendingVault;
std::unique_ptr<Subscription> mSubscription;
std::chrono::milliseconds mRetryInterval;
std::chrono::milliseconds mHardDeadline;
AssistedThread mThread;
};
}
#endif
//------------------------------------------------------------------------------
// File: CommunicatorListener.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_SHARED_COMMUNICATOR_LISTENER_HH
#define QCLIENT_SHARED_COMMUNICATOR_LISTENER_HH
#include "qclient/shared/PendingRequestVault.hh"
#include "qclient/queueing/AttachableQueue.hh"
#include "qclient/queueing/LastNSet.hh"
#include "qclient/queueing/LastNMap.hh"
#include <string>
#include <memory>
namespace qclient {
class Subscriber;
class Subscription;
class Message;
class CommunicatorListener;
class QClient;
//------------------------------------------------------------------------------
// CommunicatorRequest
//------------------------------------------------------------------------------
class CommunicatorRequest {
public:
//----------------------------------------------------------------------------
// Constructor
//----------------------------------------------------------------------------
CommunicatorRequest(CommunicatorListener *listener, const std::string &uuid,
const std::string &contents);
//----------------------------------------------------------------------------
// Get request ID
//----------------------------------------------------------------------------
std::string getID() const;
//----------------------------------------------------------------------------
// Get contents
//----------------------------------------------------------------------------
std::string getContents() const;
//----------------------------------------------------------------------------
// Send reply
//----------------------------------------------------------------------------
void sendReply(int64_t status, const std::string &contents);
private:
CommunicatorListener *mListener;
std::string mUuid;
std::string mContents;
};
//------------------------------------------------------------------------------
// Convenience class to receive messages sent by Communicator.
//------------------------------------------------------------------------------
class CommunicatorListener : public qclient::AttachableQueue<CommunicatorRequest, 100> {
public:
//----------------------------------------------------------------------------
// Constructor
//----------------------------------------------------------------------------
CommunicatorListener(Subscriber *subscriber, const std::string &channel);
//----------------------------------------------------------------------------
// Destructor
//----------------------------------------------------------------------------
~CommunicatorListener();
//----------------------------------------------------------------------------
// Send reply
//----------------------------------------------------------------------------
void sendReply(int64_t status, const std::string &uuid, const std::string &contents);
private:
//----------------------------------------------------------------------------
// Process incoming message
//----------------------------------------------------------------------------
void processIncoming(Message &&msg);
Subscriber *mSubscriber;
QClient *mQcl;
std::string mChannel;
std::unique_ptr<Subscription> mSubscription;
LastNSet<std::string> mAlreadyReceived;
LastNMap<std::string, CommunicatorReply> mCachedReplies;
};
}
#endif
//------------------------------------------------------------------------------
// File: PendingRequestVault.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_SHARED_PENDING_REQUEST_VAULT_HH
#define QCLIENT_SHARED_PENDING_REQUEST_VAULT_HH
#include <string>
#include <future>
#include <map>
#include <chrono>
#include <list>
#include <shared_mutex>
#include <condition_variable>
namespace qclient {
struct CommunicatorReply {
int64_t status;
std::string contents;
};
//------------------------------------------------------------------------------
// Structure to keep track of current pending requests, and provide easy access
// to ones that need to be retried.
//
// Operations:
// - Insert a new request, provide a UUID and std::future<Reply>
//------------------------------------------------------------------------------
class PendingRequestVault {
public:
using RequestID = std::string;
struct Item {
Item() {}
Item(const RequestID &reqid) : id(reqid) {}
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point lastRetry;
RequestID id;
std::string channel;
std::string contents;
std::promise<CommunicatorReply> promise;
std::list<RequestID>::iterator listIter;
};
//----------------------------------------------------------------------------
// Constructor
//----------------------------------------------------------------------------
PendingRequestVault();
//----------------------------------------------------------------------------
// Destructor
//----------------------------------------------------------------------------
~PendingRequestVault();
//----------------------------------------------------------------------------
// Insert pending request
//----------------------------------------------------------------------------
struct InsertOutcome {
RequestID id;
std::future<CommunicatorReply> fut;
};
InsertOutcome insert(const std::string &channel, const std::string &contents,
std::chrono::steady_clock::time_point timepoint);
//----------------------------------------------------------------------------
// Satisfy pending request
//----------------------------------------------------------------------------
bool satisfy(const RequestID &id, CommunicatorReply &&reply);
//----------------------------------------------------------------------------
// Get current pending requests
//----------------------------------------------------------------------------
size_t size() const;
//----------------------------------------------------------------------------
// Get earliest retry
// - Return value False: Vault is empty
// - Return value True: tp is filled with earliest lastRetry time_point
//----------------------------------------------------------------------------
bool getEarliestRetry(std::chrono::steady_clock::time_point &tp);
//----------------------------------------------------------------------------
// Block until there's an item in the queue
//----------------------------------------------------------------------------
void blockUntilNonEmpty();
//----------------------------------------------------------------------------
// Set blocking mode
//----------------------------------------------------------------------------
void setBlockingMode(bool val);
//----------------------------------------------------------------------------
// Expire any items which were submitted past the deadline.
// Only the original submission time counts here, not the retries.
//----------------------------------------------------------------------------
size_t expire(std::chrono::steady_clock::time_point deadline);
//----------------------------------------------------------------------------
// Retry front item, if it exists
//----------------------------------------------------------------------------
bool retryFrontItem(std::chrono::steady_clock::time_point now,
std::string &channel, std::string &contents, std::string &id);
private:
//----------------------------------------------------------------------------
// Drop front item
//----------------------------------------------------------------------------
void dropFront();
using PendingRequestMap = std::map<RequestID, Item>;
PendingRequestMap mPendingRequests;
std::list<RequestID> mNextToRetry;
bool mBlockingMode {true};
mutable std::mutex mMutex;
std::condition_variable mCV;
};
}
#endif
// ----------------------------------------------------------------------
// File: SteadyClock.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_UTILS_STEADY_CLOCK_HH
#define QCLIENT_UTILS_STEADY_CLOCK_HH
#include <chrono>
#include <mutex>
namespace qclient {
//------------------------------------------------------------------------------
//! A clock which behaves similarly to std::chrono::steady_clock, but can be
//! faked. During faking, you can advance time manually.
//------------------------------------------------------------------------------
class SteadyClock
{
public:
//----------------------------------------------------------------------------
//! Constructor: Specify whether we're faking time, or not.
//----------------------------------------------------------------------------
SteadyClock(bool fake_) : mFake(fake_) {}
//----------------------------------------------------------------------------
//! Static now function - it's also possible to pass a nullptr
//----------------------------------------------------------------------------
static std::chrono::steady_clock::time_point now(SteadyClock* clock)
{
if (clock == nullptr) {
return std::chrono::steady_clock::now();
}
return clock->getTime();
}
//----------------------------------------------------------------------------
//! Get current time.
//----------------------------------------------------------------------------
std::chrono::steady_clock::time_point getTime() const
{
if (mFake) {
std::lock_guard<std::mutex> lock(mtx);
return fakeTimepoint;
}
return std::chrono::steady_clock::now();
}
//----------------------------------------------------------------------------
//! Advance current time - only call if you're faking the clock, otherwise
//! has no effect...
//----------------------------------------------------------------------------
template<typename T>
void advance(T duration)
{
std::lock_guard<std::mutex> lock(mtx);
fakeTimepoint += duration;
}
//----------------------------------------------------------------------------
//! Utility function to convert a time_point to seconds since epoch
//----------------------------------------------------------------------------
static std::chrono::seconds secondsSinceEpoch(
std::chrono::steady_clock::time_point point)
{
return std::chrono::duration_cast<std::chrono::seconds>(
point.time_since_epoch());
}
//----------------------------------------------------------------------------
//! Check if this is a "fake" clock
//----------------------------------------------------------------------------
inline bool isFake() const
{
return mFake;
}
private:
bool mFake;
mutable std::mutex mtx;
std::chrono::steady_clock::time_point fakeTimepoint;
};
}
#endif
......@@ -229,9 +229,10 @@ bool ConnectionCore::consumeResponse(redisReplyPtr &&reply) {
if(listener) {
Message msg;
if(!MessageParser::parse(std::move(reply), msg)) {
//------------------------------------------------------------------------
//----------------------------------------------------------------------
// Parse error, doesn't look like a valid pub/sub message
//------------------------------------------------------------------------
//----------------------------------------------------------------------
QCLIENT_LOG(logger, LogLevel::kWarn, "Unable to parse incoming PUSH type message: " << qclient::describeRedisReply(reply));
return false;
}
......@@ -256,6 +257,7 @@ bool ConnectionCore::consumeResponse(redisReplyPtr &&reply) {
//------------------------------------------------------------------------
// Parse error, doesn't look like a valid pub/sub message
//------------------------------------------------------------------------
QCLIENT_LOG(logger, LogLevel::kWarn, "Unable to parse incoming message while connection is in PUB/SUB mode: " << qclient::describeRedisReply(reply));
return false;
}
......
// ----------------------------------------------------------------------
// File: Uuid.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_UUID_HH
#define QCLIENT_UUID_HH
#include <uuid/uuid.h>
namespace qclient {
inline std::string generateUuid() {
char buffer[64];
uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse(uuid, buffer);
return std::string(buffer);
}
}
#endif
......@@ -213,7 +213,11 @@ std::unique_ptr<Subscription> Subscriber::subscribe(const std::string &channel)
// Get underlying QClient - lifetime tied to this object
//------------------------------------------------------------------------------
qclient::QClient* Subscriber::getQcl() {
return base->getQcl();
if(base) {
return base->getQcl();
}
return nullptr;
}
}
//------------------------------------------------------------------------------
// File: BinarySerializer.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "BinarySerializer.hh"
#include <string.h>
#include <iostream>
#ifdef __APPLE__
#include <libkern/OSByteOrder.h>
#define htobe16(x) OSSwapHostToBigInt16(x)
#define htole16(x) OSSwapHostToLittleInt16(x)
#define be16toh(x) OSSwapBigToHostInt16(x)
#define le16toh(x) OSSwapLittleToHostInt16(x)
#define htobe32(x) OSSwapHostToBigInt32(x)
#define htole32(x) OSSwapHostToLittleInt32(x)
#define be32toh(x) OSSwapBigToHostInt32(x)
#define le32toh(x) OSSwapLittleToHostInt32(x)
#define htobe64(x) OSSwapHostToBigInt64(x)
#define htole64(x) OSSwapHostToLittleInt64(x)
#define be64toh(x) OSSwapBigToHostInt64(x)
#define le64toh(x) OSSwapLittleToHostInt64(x)
#endif
namespace qclient {
static void intToBinaryString(int64_t num, char* buff) {
int64_t be = htobe64(num);
memcpy(buff, &be, sizeof(be));
}
static int64_t binaryStringToInt(const char* buff) {
int64_t result;
memcpy(&result, buff, sizeof(result));
return be64toh(result);
}
//------------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------------
BinarySerializer::BinarySerializer(std::string &trg, size_t size)
: target(trg) {
target.resize(size);
currentPosition = 0;
}
//------------------------------------------------------------------------------
// Get current position for write
//------------------------------------------------------------------------------
char* BinarySerializer::pos() {
return ((char*) target.data()) + currentPosition;
}
//------------------------------------------------------------------------------
// Append int64_t
//------------------------------------------------------------------------------
void BinarySerializer::appendInt64(int64_t num) {
intToBinaryString(num, pos());
currentPosition += sizeof(int64_t);
}
//------------------------------------------------------------------------------
// Append raw bytes
//------------------------------------------------------------------------------
void BinarySerializer::appendBytes(const char* source, size_t len) {
memcpy(pos(), source, len);
currentPosition += len;
}
//------------------------------------------------------------------------------
// Append string, including the length
//------------------------------------------------------------------------------
void BinarySerializer::appendString(const std::string &str) {
appendInt64(str.size());
appendBytes(str.data(), str.size());
}
//------------------------------------------------------------------------------
// Get size remaining
//------------------------------------------------------------------------------
int64_t BinarySerializer::getRemaining() const {
return target.size() - currentPosition;
}
//------------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------------
BinaryDeserializer::BinaryDeserializer(const std::string &src)
: source(src), currentPosition(0) {}
//------------------------------------------------------------------------------
//! Fetch int64_t
//------------------------------------------------------------------------------
bool BinaryDeserializer::consumeInt64(int64_t &out) {
if(!canConsume(8)) {
return false;
}
out = binaryStringToInt(source.data()+currentPosition);
currentPosition += 8;
return true;
}
//------------------------------------------------------------------------------
//! Consume that many raw bytes
//------------------------------------------------------------------------------
bool BinaryDeserializer::consumeRawBytes(std::string &str, size_t sz) {
if(!canConsume(sz)) {
return false;
}
str.resize(sz);
memcpy( (char*) str.data(), source.data()+currentPosition, sz);
currentPosition += sz;
return true;
}
//------------------------------------------------------------------------------
//! Fetch string
//------------------------------------------------------------------------------
bool BinaryDeserializer::consumeString(std::string &str) {
int64_t sz = 0;
if(!consumeInt64(sz)) {
return false;
}
return consumeRawBytes(str, sz);
}
//------------------------------------------------------------------------------
//! Get number of bytes left
//------------------------------------------------------------------------------
size_t BinaryDeserializer::bytesLeft() const {
return (source.size() - currentPosition);
}
//------------------------------------------------------------------------------
//! Check if it's possible to consume N bytes
//------------------------------------------------------------------------------
bool BinaryDeserializer::canConsume(size_t b) const {
return (source.size() - currentPosition) >= b;
}
}
//------------------------------------------------------------------------------
// File: BinarySerializer.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#ifndef QCLIENT_SHARED_BINARY_SERIALIZER_HH
#define QCLIENT_SHARED_BINARY_SERIALIZER_HH
#include <map>
#include <string>
namespace qclient {
//------------------------------------------------------------------------------
//! Helper class for efficiently serializing messages
//------------------------------------------------------------------------------
class BinarySerializer {
public:
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
BinarySerializer(std::string &target, size_t size);
//----------------------------------------------------------------------------
//! Append int64_t
//----------------------------------------------------------------------------
void appendInt64(int64_t num);
//----------------------------------------------------------------------------
//! Append raw bytes
//----------------------------------------------------------------------------
void appendBytes(const char* source, size_t len);
//----------------------------------------------------------------------------
//! Append string, including the length
//----------------------------------------------------------------------------
void appendString(const std::string &str);
//----------------------------------------------------------------------------
//! Get size remaining
//----------------------------------------------------------------------------
int64_t getRemaining() const;
private:
//----------------------------------------------------------------------------
//! Get current position for write
//----------------------------------------------------------------------------
char* pos();
std::string &target;
size_t currentPosition;
};
//------------------------------------------------------------------------------
//! Helper class for efficiently de-serializing messages
//------------------------------------------------------------------------------
class BinaryDeserializer {
public:
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
BinaryDeserializer(const std::string &source);
//----------------------------------------------------------------------------
//! Consume int64_t
//----------------------------------------------------------------------------
bool consumeInt64(int64_t &out);
//----------------------------------------------------------------------------
//! Consume string, including its length
//----------------------------------------------------------------------------
bool consumeString(std::string &str);
//----------------------------------------------------------------------------
//! Consume that many raw bytes
//----------------------------------------------------------------------------
bool consumeRawBytes(std::string &str, size_t sz);
//----------------------------------------------------------------------------
//! Get number of bytes left
//----------------------------------------------------------------------------
size_t bytesLeft() const;
private:
//----------------------------------------------------------------------------
//! Check if there's enough space to extract N bytes
//----------------------------------------------------------------------------
bool canConsume(size_t b) const;
const std::string &source;
size_t currentPosition;
};
}
#endif
\ No newline at end of file
//------------------------------------------------------------------------------
// File: Communicator.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "qclient/shared/Communicator.hh"
#include "qclient/pubsub/Subscriber.hh"
#include "qclient/pubsub/Message.hh"
#include "SharedSerialization.hh"
#include "qclient/SSTR.hh"
#include "qclient/utils/Macros.hh"
#include "qclient/utils/SteadyClock.hh"
#include "qclient/Debug.hh"
namespace qclient {
//------------------------------------------------------------------------------
// Convenience class for point-to-point request / response messaging
//------------------------------------------------------------------------------
Communicator::Communicator(Subscriber* subscriber, const std::string &channel,
SteadyClock* clock, std::chrono::milliseconds retryInterval,
std::chrono::seconds deadline)
: mSubscriber(subscriber), mChannel(channel), mClock(clock), mQcl(mSubscriber->getQcl()),
mRetryInterval(retryInterval), mHardDeadline(deadline) {
mSubscription = mSubscriber->subscribe(mChannel);
using namespace std::placeholders;
mSubscription->attachCallback(std::bind(&Communicator::processIncoming, this, _1));
if(!mClock || !mClock->isFake()) {
mThread.reset(&Communicator::backgroundThread, this);
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
Communicator::~Communicator() {
mPendingVault.setBlockingMode(false);
}
//------------------------------------------------------------------------------
// Cleanup and retry thread
//------------------------------------------------------------------------------
void Communicator::backgroundThread(ThreadAssistant &assistant) {
while(!assistant.terminationRequested()) {
std::chrono::steady_clock::time_point earliestRetry;
if(!mPendingVault.getEarliestRetry(earliestRetry)) {
// Pending vault empty, sleep
mPendingVault.blockUntilNonEmpty();
continue;
}
std::chrono::steady_clock::time_point now = SteadyClock::now(mClock);
if(earliestRetry+mRetryInterval > now) {
// Not there yet, need to wait a bit more
std::chrono::milliseconds nextRetryIn = std::chrono::duration_cast<std::chrono::milliseconds>(now - (earliestRetry+mRetryInterval));
assistant.wait_for(nextRetryIn);
continue;
}
std::string channel, contents, id;
if(runNextToRetry(channel, contents, id) && mQcl) {
// Go
mQcl->exec("PUBLISH", channel, serializeCommunicatorRequest(id, contents));
}
}
}
//------------------------------------------------------------------------------
// Issue a request on the given channel
//------------------------------------------------------------------------------
std::future<CommunicatorReply> Communicator::issue(const std::string &contents) {
std::string unused;
return issue(contents, unused);
}
//------------------------------------------------------------------------------
// Issue a request on the given channel, retrieve ID too
//------------------------------------------------------------------------------
std::future<CommunicatorReply> Communicator::issue(const std::string &contents, std::string &id) {
PendingRequestVault::InsertOutcome outcome = mPendingVault.insert(mChannel,
contents, SteadyClock::now(mClock));
id = outcome.id;
if(mQcl) {
mQcl->exec("PUBLISH", mChannel, serializeCommunicatorRequest(outcome.id, contents));
}
return std::move(outcome.fut);
}
//------------------------------------------------------------------------------
// Run next-to-retry pass
//
// Return value:
// - False: Nothing to retry
// - True: We have something to retry
//------------------------------------------------------------------------------
bool Communicator::runNextToRetry(std::string &channel, std::string &contents, std::string &id) {
mPendingVault.expire(SteadyClock::now(mClock) - mHardDeadline);
std::chrono::steady_clock::time_point earliestRetry;
if(!mPendingVault.getEarliestRetry(earliestRetry)) {
// Empty, nothing to retry
return false;
}
// Are we at least mRetryInterval ahead of last retry?
if(earliestRetry+mRetryInterval > SteadyClock::now(mClock)) {
return false;
}
// Let's do it
return mPendingVault.retryFrontItem(SteadyClock::now(mClock), channel, contents, id);
}
//------------------------------------------------------------------------------
// Process incoming message
//------------------------------------------------------------------------------
void Communicator::processIncoming(Message &&msg) {
if(msg.getMessageType() != MessageType::kMessage) return;
if(msg.getChannel() != mChannel) return;
std::string uuid;
CommunicatorReply reply;
if(parseCommunicatorReply(msg.getPayload(), reply, uuid)) {
mPendingVault.satisfy(uuid, std::move(reply));
}
}
}
//------------------------------------------------------------------------------
// File: CommunicatorListener.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <http://www.gnu.org/licenses/>.*
************************************************************************/
#include "qclient/shared/CommunicatorListener.hh"
#include "qclient/pubsub/Subscriber.hh"
#include "qclient/pubsub/Message.hh"
#include "qclient/shared/PendingRequestVault.hh"
#include "shared/SharedSerialization.hh"
namespace qclient {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
CommunicatorRequest::CommunicatorRequest(CommunicatorListener *listener, const std::string &uuid,
const std::string &contents) : mListener(listener), mUuid(uuid), mContents(contents) {}
//------------------------------------------------------------------------------
// Get request ID
//------------------------------------------------------------------------------
std::string CommunicatorRequest::getID() const {
return mUuid;
}
//------------------------------------------------------------------------------
// Get contents
//------------------------------------------------------------------------------
std::string CommunicatorRequest::getContents() const {
return mContents;
}
//------------------------------------------------------------------------------
// Send reply
//------------------------------------------------------------------------------
void CommunicatorRequest::sendReply(int64_t status, const std::string &contents) {
if(mListener) {
mListener->sendReply(status, mUuid, contents);
}
}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
CommunicatorListener::CommunicatorListener(Subscriber *subscriber, const std::string &channel)
: mSubscriber(subscriber), mQcl(mSubscriber->getQcl()), mChannel(channel),
mAlreadyReceived(1000), mCachedReplies(1000) {
mSubscription = mSubscriber->subscribe(mChannel);
using namespace std::placeholders;
mSubscription->attachCallback(std::bind(&CommunicatorListener::processIncoming, this, _1));
}
//------------