Skip to content
Snippets Groups Projects
Commit 466ab41e authored by Mario Gonzalez Carpintero's avatar Mario Gonzalez Carpintero Committed by Elisabetta Pianori
Browse files

Revert "added libDataSink source code"

This reverts commit 03e8d0be.

Deleting the source code for libDataSink (added manually) to merge the branch in the right way
parent 754affb4
Branches
Tags
No related merge requests found
...@@ -7,3 +7,6 @@ ...@@ -7,3 +7,6 @@
[submodule "src/exts/pybind11_json"] [submodule "src/exts/pybind11_json"]
path = src/exts/pybind11_json path = src/exts/pybind11_json
url = https://github.com/pybind/pybind11_json.git url = https://github.com/pybind/pybind11_json.git
[submodule "src/exts/influxdb-cpp"]
path = src/exts/influxdb-cpp
url = https://github.com/orca-zhang/influxdb-cpp.git
...@@ -17,8 +17,9 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/") ...@@ -17,8 +17,9 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/")
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 11)
set(CMAKE_MACOSX_RPATH 1) set(CMAKE_MACOSX_RPATH 1)
# Allow internal or external JSON library # Allow internal or external third-party libraries
option(USE_EXTERNAL_JSON "Use an external nlohmann JSON library" OFF) option(USE_EXTERNAL_JSON "Use an external nlohmann JSON library" OFF)
option(USE_EXTERNAL_INFLUXDBCPP "Use an external influxdb-cpp library" OFF)
option(USE_PYTHON "Make python bindings" OFF) option(USE_PYTHON "Make python bindings" OFF)
......
# - Try to find influxdb-cpp
# https://github.com/orca-zhang/influxdb-cpp
#
# Once done this will define
# INFLUXDBCPP_FOUND - System has influxdb-cpp
# INFLUXDBCPP_INCLUDE_DIR - The influxdb-cpp include directories
FIND_PATH(INFLUXDBCPP_INCLUDE_DIR influxdb.hpp
HINTS /usr/include)
# FIND_LIBRARY(INFLUXDBCPP_LIBRARY NAMES mpsse libinfluxdbcpp
# HINTS /usr/lib64 )
INCLUDE(FindPackageHandleStandardArgs)
# handle the QUIETLY and REQUIRED arguments and set INFLUXDBCPP_FOUND to TRUE
# if all listed variables are TRUE
FIND_PACKAGE_HANDLE_STANDARD_ARGS(influxdbcpp DEFAULT_MSG
INFLUXDBCPP_INCLUDE_DIR)
MARK_AS_ADVANCED(INFLUXDBCPP_INCLUDE_DIR)
...@@ -47,6 +47,12 @@ else() ...@@ -47,6 +47,12 @@ else()
set(JSON_LIBS "nlohmann_json::nlohmann_json") set(JSON_LIBS "nlohmann_json::nlohmann_json")
endif() endif()
# Find influxdb-cpp or use integrated one
if (NOT USE_EXTERNAL_INFLUXDBCPP)
SET(INFLUXDBCPP_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/exts/influxdb-cpp)
endif()
find_package(influxdbcpp)
# Find libgpib # Find libgpib
find_package ( libgpib ) find_package ( libgpib )
......
...@@ -6,10 +6,18 @@ ...@@ -6,10 +6,18 @@
"datasinks":{ "datasinks":{
"Console":{ "Console":{
"sinktype": "ConsoleSink" "sinktype": "ConsoleSink"
}, },
"File":{ "File":{
"sinktype": "CSVSink", "sinktype": "CSVSink",
"directory": "myOutputData" "directory": "myOutputData"
},
"db": {
"sinktype": "InfluxDBSink",
"host": "127.0.0.1",
"port": 8086,
"database": "dcsDB",
"username": "userName",
"password": "password"
} }
}, },
"devices": { "devices": {
......
Subproject commit a7888875f39133f36340913a0d6aa6660c496307
...@@ -6,6 +6,19 @@ target_sources(DataSink ...@@ -6,6 +6,19 @@ target_sources(DataSink
CSVSink.cpp CSVSink.cpp
DataSinkRegistry.cpp DataSinkRegistry.cpp
) )
target_link_libraries(DataSink PRIVATE Utils) target_link_libraries(DataSink PRIVATE Utils)
target_link_libraries(DataSink PUBLIC ${JSON_LIBS}) target_link_libraries(DataSink PUBLIC ${JSON_LIBS})
target_include_directories(DataSink PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_include_directories(DataSink PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
# Add optional influxdb datasink
if( ${INFLUXDBCPP_FOUND} )
target_sources(DataSink
PRIVATE
InfluxDBSink.cpp
)
target_include_directories (DataSink PRIVATE ${INFLUXDBCPP_INCLUDE_DIR})
else()
message("skipping InfluxDBSink due to missing influxdb-cpp")
endif()
#include "InfluxDBSink.h"
#include <influxdb.hpp>
#include <nlohmann/json.hpp>
#include "DataSinkRegistry.h"
REGISTER_DATASINK(InfluxDBSink)
InfluxDBSink::InfluxDBSink(const std::string& name)
: IDataSink(name)
{
}
void InfluxDBSink::setConfiguration(const nlohmann::json& config){
// Read the DB configuration (port, host and database) from 'config'
if (config.find("port") == config.end() ||
config.find("host") == config.end() ||
config.find("database") == config.end() )
{
throw std::runtime_error(
"Please specify 'port' [int], 'host' [str] and 'database' [str] in the \n" \
"json configuration file under the correspondent sink. See an example \n" \
"in 'src/configs/input-hw.json'"
);
}
else
{
m_port = config["port"];
m_host = config["host"];
m_dbName = config["database"];
}
// Read also the username and password, if provided
if (config.find("username") != config.end() &&
config.find("password") != config.end() )
{
m_usr = config["username"];
m_pwd = config["password"];
}
// Initialize the influxdb_cpp object
m_si = std::make_shared<influxdb_cpp::server_info>(influxdb_cpp::server_info(m_host, m_port, m_dbName, m_usr, m_pwd));
// Check whether the provided database exists
m_retval = influxdb_cpp::query(m_resp, "show databases", *m_si);
errorCheck(m_retval, m_resp);
if (m_resp.find(m_dbName) == std::string::npos){
throw std::runtime_error("No database \"" + m_dbName + "\" found on the specified server");
}
}
void InfluxDBSink::startMeasurement(const std::string& measurement, std::chrono::time_point<std::chrono::system_clock> time)
{
m_measName = measurement;
// Convert the timestamp to ms
m_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(time.time_since_epoch()).count();
}
void InfluxDBSink::setTag(const std::string& name, const std::string& value){
m_tagsString[name] = value;
}
void InfluxDBSink::setTag(const std::string& name, const int8_t value){
setTag(name, std::to_string(value));
}
void InfluxDBSink::setTag(const std::string& name, const int32_t value){
setTag(name, std::to_string(value));
}
void InfluxDBSink::setTag(const std::string& name, const int64_t value){
setTag(name, std::to_string(value));
}
void InfluxDBSink::setTag(const std::string& name, const double value){
setTag(name, std::to_string(value));
}
void InfluxDBSink::setField(const std::string& name, const std::string& value){
m_fieldsString[name] = value;
}
void InfluxDBSink::setField(const std::string& name, const int8_t value){
m_fieldsInt[name] = value;
}
void InfluxDBSink::setField(const std::string& name, const int32_t value){
m_fieldsInt[name] = value;
}
void InfluxDBSink::setField(const std::string& name, const int64_t value){
m_fieldsInt[name] = value;
}
void InfluxDBSink::setField(const std::string& name, const double value){
m_fieldsDouble[name] = value;
}
void InfluxDBSink::setPrecision(int precision)
{
m_prec = precision;
}
void InfluxDBSink::recordPoint()
{
// Create InfluxDB object and useful casts
influxdb_cpp::builder builder;
influxdb_cpp::detail::tag_caller & tag_caller =static_cast<influxdb_cpp::detail::tag_caller &>(builder);
influxdb_cpp::detail::field_caller& field_caller=static_cast<influxdb_cpp::detail::field_caller&>(builder);
// Set the measurement name
builder.meas(m_measName);
// Add tags to the InfluxDB object
for (const std::pair<std::string, std::string>& x: m_tagsString)
tag_caller.tag(x.first, x.second);
bool fieldIsInitialized = false;
// Add fields to the InfluxDB object
for (const std::pair<std::string, double >& x: m_fieldsDouble)
{
if(!fieldIsInitialized)
{
tag_caller .field(x.first, x.second);
fieldIsInitialized = true;
}
else
{
field_caller.field(x.first, x.second);
}
}
for (const std::pair<std::string, int32_t >& x: m_fieldsInt )
{
if(!fieldIsInitialized)
{
tag_caller .field(x.first, x.second);
fieldIsInitialized = true;
}
else
{
field_caller.field(x.first, x.second);
}
}
for (const std::pair<std::string, std::string>& x: m_fieldsString)
{
if(!fieldIsInitialized)
{
tag_caller .field(x.first, x.second);
fieldIsInitialized = true;
}
else
{
field_caller.field(x.first, x.second);
}
}
// Need fields to upload
if(!fieldIsInitialized)
throw std::runtime_error("Unable to record InfluxDB point with no fields.");
// Add the timestamp
field_caller.timestamp(m_timestamp);
// Upload everything and store the command result in &m_resp.
m_retval = field_caller.post_http(*m_si, &m_resp);
errorCheck(m_retval, m_resp);
}
void InfluxDBSink::endMeasurement()
{
m_fieldsString.clear();
m_fieldsInt.clear();
m_fieldsDouble.clear();
}
void InfluxDBSink::query(std::string& m_resp, const std::string& query)
{
m_retval = influxdb_cpp::query(m_resp, query, *m_si);
errorCheck(m_retval, m_resp);
}
void InfluxDBSink::errorCheck(int m_retval, std::string& m_resp)
{
if (m_retval != 0){
throw std::runtime_error("Received an error from InfluxDB: " + m_resp);
}
}
#ifndef INFLUXDBSINK_H
#define INFLUXDBSINK_H
#include "IDataSink.h"
#include <unordered_map>
// Forward declarte InfluxDB handle
namespace influxdb_cpp
{ struct server_info; };
//! \brief Data sink that prints to the console
/**
* Allows communication between labRemote and InfluxDB based on "influxdb.h", a
* third-party InfluxDB library for C++
* (https://github.com/orca-zhang/influxdb-cpp).
*
* Uploads data points to an existing database in InfluxDB. A new measurement
* (data table) will be created if the specified one doesn't exist in the
* database.
*
* The connectivity parameters (host, port and database name) need to be
* specified in the configuration file.
*/
class InfluxDBSink : public IDataSink
{
public:
//! \brief Initializes the InfluxDB sink
/**
* Initializes an influxDB sink. The sink is identified by "name", and it can
* be used to upload data to different measurements inside the same database.
*/
InfluxDBSink(const std::string& name);
//! \brief Reads the DB configuration from a JSON object.
/**
* See an example in `src/configs/input-hw.json`.
*
* Valid keys:
* - `host`: IP address of InfluxDB (hostname not supported)
* - `port`: Port to connect to
* - `database`: Name of database to use
* - `username`: [optional] Username to connect to the server
* - `password`: [optional] Password to connect to the server
*
* * \param config JSON configuration
*/
virtual void setConfiguration(const nlohmann::json& config);
//! \brief Starts a measurement
/**
* If the measurement (data table) doesn't exist, it will be created.
*/
virtual void startMeasurement(const std::string& measurement, std::chrono::time_point<std::chrono::system_clock> time);
virtual void setTag(const std::string& name, const std::string& value);
virtual void setTag(const std::string& name, double value);
virtual void setTag(const std::string& name, int8_t value);
virtual void setTag(const std::string& name, int32_t value);
virtual void setTag(const std::string& name, int64_t value);
virtual void setField(const std::string& name, const std::string& value);
virtual void setField(const std::string& name, int32_t value);
virtual void setField(const std::string& name, double value);
virtual void setField(const std::string& name, int8_t value);
virtual void setField(const std::string& name, int64_t value);
//! \brief Changes the precision of `double` values
/**
* Changes the precision of `double` values
*
* \param precision Number of digits past the decimal point.
*/
void setPrecision(int precision);
//! \brief Uploads a single row of data to influxDB
/**
* The timestamp will be uploaded with milliseconds precision.
*/
virtual void recordPoint();
//! \brief Cleans up the local variables
virtual void endMeasurement();
private:
// Error handling
int m_retval;
std::string m_resp;
//! \brief Checks for errors coming from InfluxDB
/**
* Checks for errors coming from InfluxDB
* \param retval the return value to be checked
* \param resp the response from InfluxDB
*/
void errorCheck(int retval, std::string& resp);
// the influxdb_cpp object to be filled with data
std::shared_ptr<influxdb_cpp::server_info> m_si;
// measurement configuration
std::string m_host;
int m_port;
std::string m_dbName;
std::string m_usr = "";
std::string m_pwd = "";
std::string m_measName;
unsigned long long m_timestamp;
// Fields and Tags
std::unordered_map<std::string, std::string> m_fieldsString;
std::unordered_map<std::string, int64_t > m_fieldsInt;
std::unordered_map<std::string, double > m_fieldsDouble;
std::unordered_map<std::string, std::string> m_tagsString;
// Precision of "double" values in fields
int m_prec = 2;
//! \brief Sends any command to InflxDB and gets its answer
/**
* Sends any command to be run in InfluxDB's console.
* \param query the command
* \param resp the answer
*/
void query(std::string& resp, const std::string& query);
};
#endif // INFLUXDBSINK_H
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment