Commit 9348e213 authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #110: prometheus through infospace

parent 29fe94bf
......@@ -46,7 +46,10 @@ Sources=\
ckms_quantiles.cc \
registry.cc \
summary.cc \
time_window_quantiles.cc
time_window_quantiles.cc \
xdata/Counter.cc \
Example.cc
#
# Include directories
......
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2019, CERN. *
* All rights reserved. *
* Authors: L. Orsini, D. Simelevicius *
* *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#ifndef _prometheus_Example_h_
#define _prometheus_Example_h_
#include "xdaq/Application.h"
#include "xgi/framework/UIManager.h"
#include "toolbox/ActionListener.h"
#include "xdata/ActionListener.h"
#include "toolbox/task/TimerListener.h"
#include "xdata/Counter.h"
namespace prometheus
{
// Example of a prometheus counter in XDAQ
//
class Example : public xdaq::Application, public xgi::framework::UIManager, public toolbox::ActionListener, public xdata::ActionListener, public toolbox::task::TimerListener
{
public:
XDAQ_INSTANTIATOR();
Example(xdaq::ApplicationStub* s);
~Example();
void actionPerformed ( xdata::Event& e );
void actionPerformed(toolbox::Event& e);
void timeExpired(toolbox::task::TimerEvent& e);
void Default(xgi::Input * in, xgi::Output * out );
protected:
xdata::Counter * counter_;
};
}
#endif
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2019, CERN. *
* All rights reserved. *
* Authors: L. Orsini, D. Simelevicius *
* *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#ifndef _xdata_Collectable_h_
#define _xdata_Collectable_h_
#include <string>
#include <map>
#include "prometheus/registry.h"
#include "xdata/Serializable.h"
namespace xdata
{
class Collectable : public xdata::Serializable
{
public:
Collectable(std::shared_ptr<prometheus::Registry> registry): registry_(registry)
{
}
std::shared_ptr<prometheus::Registry> registry_;
};
}
#endif
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2019, CERN. *
* All rights reserved. *
* Authors: L. Orsini, D. Simelevicius *
* *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#ifndef _xdata_Counter_h_
#define _xdata_Counter_h_
#include <string>
#include <map>
#include "prometheus/counter.h"
#include "prometheus/registry.h"
#include "xdata/Collectable.h"
#include "xdata/Serializable.h"
namespace xdata
{
// e.g. Counter("exposer_transferred_bytes_total","Transferred bytes to metrics services", {{"label", "value"}})
class Counter: public Collectable
{
public:
Counter(const std::string & name, const std::string & help, const std::map<std::string, std::string>& labels);
std::string type() const;
void setValue (const xdata::Serializable& s);
int equals(const xdata::Serializable & s) const;
std::string toString() const;
void fromString(const std::string& value);
prometheus::Counter & add (const std::string & name, const std::map<std::string, std::string>& labels);
prometheus::Counter & operator[](const std::string & name);
prometheus::Family<prometheus::Counter> & family_;
std::map<std::string, prometheus::Counter *> counters_;
};
class CounterElement
{
public:
CounterElement(prometheus::Counter & c): reference_(c) {}
prometheus::Counter & reference_;
};
}
#endif
......@@ -15,6 +15,8 @@
#include <cstring>
#include <iterator>
#include <algorithm>
#include <vector>
#ifdef HAVE_ZLIB
#include <zlib.h>
......@@ -68,111 +70,137 @@
static std::size_t WriteResponse(xgi::Input * in, xgi::Output * out, const std::string& body)
{
//cgicc::Cgicc cgi(in);
//const cgicc::CgiEnvironment& env = cgi.getEnvironment();
//const cgicc::CgiEnvironment& env = cgi.getEnvironment();
//mg_printf(conn,
// "HTTP/1.1 200 OK\r\n"
// "Content-Type: text/plain\r\n");
// "HTTP/1.1 200 OK\r\n"
// "Content-Type: text/plain\r\n");
//out->getHTTPResponseHeader(env.getServerProtocol(), 200 ,"OK").addHeader("Content-Type", "text/plain");
out->getHTTPResponseHeader().addHeader("Content-Type", "text/plain");
#ifdef HAVE_ZLIB
//auto acceptsGzip = IsEncodingAccepted(conn, "gzip");
auto acceptsGzip = false;
auto accept_encoding = in->getenv("ACCEPT_ENCODING");
if (accept_encoding == "gzip" ) acceptsGzip = true;
/*auto accept_encoding = mg_get_header(conn, "Accept-Encoding");
* if (!accept_encoding) {
* return false;
* }
* return std::strstr(accept_encoding, encoding) != nullptr;
* */
if (acceptsGzip) {
auto compressed = GZipCompress(body);
if (!compressed.empty()) {
//mg_printf(conn,
// "Content-Encoding: gzip\r\n"
// "Content-Length: %lu\r\n\r\n",
// static_cast<unsigned long>(compressed.size()));
//mg_write(conn, compressed.data(), compressed.size());
out->getHTTPResponseHeader().addHeader("Content-Encoding", "gzip");
auto cl = toolbox::toString("%lu",static_cast<unsigned long>(compressed.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", cl.c_str());
out->write(compressed.data(), compressed.size());
return compressed.size());
}
}
//auto acceptsGzip = IsEncodingAccepted(conn, "gzip");
auto acceptsGzip = false;
auto accept_encoding = in->getenv("ACCEPT_ENCODING");
if (accept_encoding == "gzip" ) acceptsGzip = true;
/*auto accept_encoding = mg_get_header(conn, "Accept-Encoding");
* if (!accept_encoding) {
* return false;
* }
* return std::strstr(accept_encoding, encoding) != nullptr;
* */
if (acceptsGzip) {
auto compressed = GZipCompress(body);
if (!compressed.empty()) {
//mg_printf(conn,
// "Content-Encoding: gzip\r\n"
// "Content-Length: %lu\r\n\r\n",
// static_cast<unsigned long>(compressed.size()));
//mg_write(conn, compressed.data(), compressed.size());
out->getHTTPResponseHeader().addHeader("Content-Encoding", "gzip");
auto cl = toolbox::toString("%lu",static_cast<unsigned long>(compressed.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", cl.c_str());
out->write(compressed.data(), compressed.size());
return compressed.size());
}
}
#endif
//mg_printf(conn, "Content-Length: %lu\r\n\r\n",
// //static_cast<unsigned long>(body.size()));
// //mg_write(conn, body.data(), body.size());
auto bcl = toolbox::toString("%lu",static_cast<unsigned long>(body.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", bcl.c_str());
out->write(body.data(), body.size());
std::cout << body.data() << std::endl;
return body.size();
//mg_printf(conn, "Content-Length: %lu\r\n\r\n",
// //static_cast<unsigned long>(body.size()));
// //mg_write(conn, body.data(), body.size());
auto bcl = toolbox::toString("%lu",static_cast<unsigned long>(body.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", bcl.c_str());
out->write(body.data(), body.size());
std::cout << body.data() << std::endl;
return body.size();
//end write response
}
#ifdef HAVE_ZLIB
static std::vector<Byte> GZipCompress(const std::string& input) {
auto zs = z_stream{};
auto windowSize = 16 + MAX_WBITS;
auto memoryLevel = 9;
auto zs = z_stream{};
auto windowSize = 16 + MAX_WBITS;
auto memoryLevel = 9;
if (deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowSize,
memoryLevel, Z_DEFAULT_STRATEGY) != Z_OK) {
return {};
}
if (deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowSize,
memoryLevel, Z_DEFAULT_STRATEGY) != Z_OK) {
return {};
}
zs.next_in = (Bytef*)input.data();
zs.avail_in = input.size();
zs.next_in = (Bytef*)input.data();
zs.avail_in = input.size();
int ret;
std::vector<Byte> output;
output.reserve(input.size() / 2u);
int ret;
std::vector<Byte> output;
output.reserve(input.size() / 2u);
do {
static const auto outputBytesPerRound = std::size_t{32768};
do {
static const auto outputBytesPerRound = std::size_t{32768};
zs.avail_out = outputBytesPerRound;
output.resize(zs.total_out + zs.avail_out);
zs.next_out = reinterpret_cast<Bytef*>(output.data() + zs.total_out);
zs.avail_out = outputBytesPerRound;
output.resize(zs.total_out + zs.avail_out);
zs.next_out = reinterpret_cast<Bytef*>(output.data() + zs.total_out);
ret = deflate(&zs, Z_FINISH);
ret = deflate(&zs, Z_FINISH);
output.resize(zs.total_out);
} while (ret == Z_OK);
output.resize(zs.total_out);
} while (ret == Z_OK);
deflateEnd(&zs);
deflateEnd(&zs);
if (ret != Z_STREAM_END) {
return {};
}
if (ret != Z_STREAM_END) {
return {};
}
return output;
return output;
}
#endif
#include "xdata/Counter.h"
XDAQ_INSTANTIATOR_IMPL(prometheus::Application);
prometheus::Application::Application (xdaq::ApplicationStub* s)
: xdaq::Application(s), xgi::framework::UIManager(this),
exposer_registry_(std::make_shared<Registry>()),
bytes_transferred_family_(BuildCounter().Name("exposer_transferred_bytes_total").Help("Transferred bytes to metrics services").Register(*exposer_registry_)),
bytes_transferred_(bytes_transferred_family_.Add({})),
num_scrapes_family_(BuildCounter().Name("exposer_scrapes_total") .Help("Number of times metrics were scraped") .Register(*exposer_registry_)),
num_scrapes_(num_scrapes_family_.Add({})),
request_latencies_family_(BuildSummary().Name("exposer_request_latencies") .Help("Latencies of serving scrape requests, in microseconds") .Register(*exposer_registry_)),
request_latencies_(request_latencies_family_.Add( {}, Summary::Quantiles{{0.5, 0.05}, {0.9, 0.01}, {0.99, 0.001}})),
user_exposer_registry_(std::make_shared<Registry>()),
counter_family_(BuildCounter().Name("time_running_seconds_total").Help("How many seconds is this server running?").Labels({{"label", "value"}}).Register(*user_exposer_registry_)),
second_counter_(counter_family_.Add( {{"another_label", "value"}, {"yet_another_label", "value"}}))
: xdaq::Application(s), xgi::framework::UIManager(this),
exposer_registry_(std::make_shared<Registry>()),
bytes_transferred_family_(BuildCounter().Name("exposer_transferred_bytes_total").Help("Transferred bytes to metrics services").Register(*exposer_registry_)),
bytes_transferred_(bytes_transferred_family_.Add({})),
num_scrapes_family_(BuildCounter().Name("exposer_scrapes_total") .Help("Number of times metrics were scraped") .Register(*exposer_registry_)),
num_scrapes_(num_scrapes_family_.Add({})),
request_latencies_family_(BuildSummary().Name("exposer_request_latencies") .Help("Latencies of serving scrape requests, in microseconds") .Register(*exposer_registry_)),
request_latencies_(request_latencies_family_.Add( {}, Summary::Quantiles{{0.5, 0.05}, {0.9, 0.01}, {0.99, 0.001}})),
user_exposer_registry_(std::make_shared<Registry>()),
counter_family_(BuildCounter().Name("time_running_seconds_total").Help("How many seconds is this server running?").Labels({{"label", "value"}}).Register(*user_exposer_registry_)),
second_counter_(counter_family_.Add( {{"another_label", "value"}, {"yet_another_label", "value"}}))
{
collectables_.push_back(exposer_registry_);
collectables_.push_back(user_exposer_registry_);
collectables_.push_back(exposer_registry_);
collectables_.push_back(user_exposer_registry_);
// alternative API for XDAQ
//auto tmp = new xdata::Counter("another_time_running_seconds_total", "Another How many seconds is this server running?", {{"label", "value"}});
//auto * element = new prometheus::serializable::CounterElement(tmp->family_.Add({{"my_another_label", "value"}, {"my_yet_another_label", "value"}}));
//element = new prometheus::serializable::CounterElement(tmp->family_.Add({{"your_another_label", "value"}, {"your_yet_another_label", "value"}}));
/*auto * aptr = &(tmp->family_.Add({{"my_another_label", "value"}, {"my_yet_another_label", "value"}}));
aptr->Increment();
tmp->family_.Add({{"your_another_label", "value"}, {"your_yet_another_label", "value"}});
tmp->add("my_xdata", {{"xdata_label", "value"}, {"yet_another_xdata_label", "value"}});
tmp->add("my_xdata2", {});
(*tmp)["my_xdata"].Increment();
(*tmp)["my_xdata"].Increment();
(*tmp)["my_xdata"].Increment();
(*tmp)["my_xdata2"].Increment();
(*tmp)["my_xdata2"].Increment(39);
//element->reference_.Increment();
*/
//collectables_.push_back(tmp->registry_);
......@@ -184,10 +212,11 @@ prometheus::Application::Application (xdaq::ApplicationStub* s)
xgi::bind(this, &prometheus::Application::metrics, "metrics");
xgi::framework::deferredbind(this, this, &prometheus::Application::Default, "Default");
// Bind setting of default parameters
getApplicationInfoSpace()->addListener(this, "urn:xdaq-event:setDefaultValues");
getApplicationContext()->addActionListener(this); // attach to endpoint available events
getApplicationContext()->addActionListener(this);
}
prometheus::Application::~Application()
......@@ -205,9 +234,60 @@ void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
// user example update
second_counter_.Increment();
// scan infospaces and register items
xdata::getInfoSpaceFactory()->lock();
std::vector<xdata::InfoSpace*> spaces;
auto iss = xdata::getInfoSpaceFactory()->match("urn:prometheus:(.*)");
for (auto it = iss.begin(); it != iss.end(); ++it)
{
xdata::InfoSpace * is = dynamic_cast<xdata::InfoSpace *>((*it).second);
is->lock();
std::cout << "---found infospace: " << is->name() << std::endl;
auto items = is->match("(.*)"); // get all items
for (auto j = items.begin(); j != items.end(); j++)
{
if ( j->second->type().find("prometheus") != std::string::npos )
{
auto registry = dynamic_cast<xdata::Collectable*>(j->second)->registry_;
///
auto exists = [this,registry]() {
for (auto&& wcollectable : collectables_)
{
auto collectable = wcollectable.lock();
if (!collectable) {
continue;
}
if ( collectable == registry ) // not sure it compares the actual fired item
{
std::cout << "---found collectable" << std::endl;
return true;
}
}
return false;
}();
if (! exists )
{
std::cout << "---register item: " << j->first << " of type: " << j->second->type() << std::endl;
collectables_.push_back(registry);
}
}
}
is->unlock();
}
xdata::getInfoSpaceFactory()->unlock();
return;
}
}
......@@ -216,7 +296,7 @@ void prometheus::Application::actionPerformed( xdata::Event& event)
if (event.type() == "urn:xdaq-event:setDefaultValues")
{
// activate a timer for user example
toolbox::task::Timer * timer = 0;
timer = toolbox::task::getTimerFactory()->createTimer("urn:prometheus:timer");
......@@ -253,29 +333,29 @@ void prometheus::Application::metrics(xgi::Input * in, xgi::Output * out )
// collect metrics
auto collected_metrics = std::vector<MetricFamily>{};
for (auto&& wcollectable : collectables_)
for (auto&& wcollectable : collectables_)
{
auto collectable = wcollectable.lock();
if (!collectable) {
continue;
}
auto collectable = wcollectable.lock();
if (!collectable) {
continue;
}
auto&& metrics = collectable->Collect();
std::cout << "collecatble insert" << std::endl;
collected_metrics.insert(collected_metrics.end(), std::make_move_iterator(metrics.begin()), std::make_move_iterator(metrics.end()));
}
std::cout << "collectable insert" << std::endl;
collected_metrics.insert(collected_metrics.end(), std::make_move_iterator(metrics.begin()), std::make_move_iterator(metrics.end()));
}
// end collect metrics
// serialize and output of metrics to requester
auto serializer = std::unique_ptr<Serializer>{new TextSerializer()};
auto serializer = std::unique_ptr<Serializer>{new TextSerializer()};
auto bodySize = WriteResponse(in, out, serializer->Serialize(collected_metrics));
// update prometheus plugin metrics
auto stop_time_of_request = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time_of_request - start_time_of_request);
// update prometheus plugin metrics
auto stop_time_of_request = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time_of_request - start_time_of_request);
request_latencies_.Observe(duration.count());
bytes_transferred_.Increment(bodySize);
......@@ -286,6 +366,6 @@ void prometheus::Application::metrics(xgi::Input * in, xgi::Output * out )
void prometheus::Application::Default(xgi::Input * in, xgi::Output * out )
{
*out << cgicc::h3("Smart Life Access Server Hub") << std::endl;
*out << cgicc::h3("XDAQ Prometheus Plugin") << std::endl;
}
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2019, CERN. *
* All rights reserved. *
* Authors: L. Orsini, D. Simelevicius *
* *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
************************************************************************/
#include "prometheus/Example.h"
#include "cgicc/HTMLClasses.h"
#include "xdata/InfoSpaceFactory.h"
#include "toolbox/task/TimerFactory.h"
#include "xgi/framework/Method.h"
XDAQ_INSTANTIATOR_IMPL(prometheus::Example);
prometheus::Example::Example (xdaq::ApplicationStub* s)
: xdaq::Application(s), xgi::framework::UIManager(this)
{
// create a prometheus object ( e.g. xdata::Counter )
counter_ = new xdata::Counter("my_counter_total", "How many times it ticks?", {{"my_counter_total_label", "value"}});
counter_->add("my_counter", {{"my_example_label", "value"}});
xgi::framework::deferredbind(this, this, &prometheus::Example::Default, "Default");
//
// Bind setting of default parameters
getApplicationInfoSpace()->addListener(this, "urn:xdaq-event:setDefaultValues");
getApplicationContext()->addActionListener(this);
}
prometheus::Example::~Example()
{
}
void prometheus::Example::timeExpired(toolbox::task::TimerEvent& e)
{
LOG4CPLUS_DEBUG (this->getApplicationLogger(), "Timer callback");
//std::cout << "timer expired " << e.type() << std::endl;
if (e.getTimerTask()->name == "urn:prometheus:watchdog" )
{
// user example update
(*counter_)["my_counter"].Increment();
return;
}
}
void prometheus::Example::actionPerformed(toolbox::Event& e)
{
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Received event " << e.type());
if ( e.type() == "urn:xdaq-event:profile-loaded")
{
std::cout << "got event " << e.type() << std::endl;
}
}
void prometheus::Example::actionPerformed( xdata::Event& event)
{