Commit 6161f9ac authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #110: support for fire/revoke of prometheus items

parent bf7dc8ab
......@@ -15,6 +15,7 @@
#include <memory>
#include <vector>
#include <mutex>
#include "prometheus/counter.h"
#include "prometheus/registry.h"
......@@ -74,7 +75,10 @@ namespace prometheus
std::string uri_;
std::vector<std::weak_ptr<Collectable>> residentcollectables_;
std::vector<std::weak_ptr<Collectable>> collectables_;
std::mutex mutex_;
// resident monitorables
std::shared_ptr<Registry> exposer_registry_;
......@@ -93,6 +97,7 @@ namespace prometheus
Family<Counter> & counter_family_;
Counter & second_counter_;
};
}
#endif
......@@ -56,6 +56,9 @@ namespace prometheus
xdata::Summary summary_;
xdata::Histogram histogram_;
std::string ispaceURN_;
};
}
......
......@@ -69,33 +69,18 @@
static std::size_t WriteResponse(xgi::Input * in, xgi::Output * out, const std::string& body)
{
//cgicc::Cgicc cgi(in);
//const cgicc::CgiEnvironment& env = cgi.getEnvironment();
//mg_printf(conn,
// "HTTP/1.1 200 OK\r\n"
// "Content-Type: text/plain\r\n");
//out->getHTTPResponseHeader(env.getServerProtocol(), 200 ,"OK").addHeader("Content-Type", "text/plain");
out->getHTTPResponseHeader().addHeader("Content-Type", "text/plain");
#ifdef HAVE_ZLIB
out->getHTTPResponseHeader().getHTTPVersion("1.1").getStatusCode(200).getReasonPhrase(xgi::Utils::getResponsePhrase(200)).addHeader("Content-Type", "text/plain");
#ifdef HAVE_ZLIB
//auto acceptsGzip = IsEncodingAccepted(conn, "gzip");
auto acceptsGzip = false;
auto accept_encoding = in->getenv("ACCEPT_ENCODING");
if (accept_encoding == "gzip" ) acceptsGzip = true;
/*auto accept_encoding = mg_get_header(conn, "Accept-Encoding");
* if (!accept_encoding) {
* return false;
* }
* return std::strstr(accept_encoding, encoding) != nullptr;
* */
if (acceptsGzip) {
auto compressed = GZipCompress(body);
if (!compressed.empty()) {
//mg_printf(conn,
// "Content-Encoding: gzip\r\n"
// "Content-Length: %lu\r\n\r\n",
// static_cast<unsigned long>(compressed.size()));
//mg_write(conn, compressed.data(), compressed.size());
out->getHTTPResponseHeader().addHeader("Content-Encoding", "gzip");
auto cl = toolbox::toString("%lu",static_cast<unsigned long>(compressed.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", cl.c_str());
......@@ -104,9 +89,7 @@ static std::size_t WriteResponse(xgi::Input * in, xgi::Output * out, const std::
}
}
#endif
//mg_printf(conn, "Content-Length: %lu\r\n\r\n",
// //static_cast<unsigned long>(body.size()));
// //mg_write(conn, body.data(), body.size());
auto bcl = toolbox::toString("%lu",static_cast<unsigned long>(body.size()));
out->getHTTPResponseHeader().addHeader("Content-Length", bcl.c_str());
......@@ -180,9 +163,10 @@ prometheus::Application::Application (xdaq::ApplicationStub* s)
{
collectables_.push_back(exposer_registry_);
collectables_.push_back(user_exposer_registry_);
//collectables_.push_back(exposer_registry_);
//collectables_.push_back(user_exposer_registry_);
residentcollectables_.push_back(exposer_registry_);
residentcollectables_.push_back(user_exposer_registry_);
s->getDescriptor()->setAttribute("icon", "/prometheus/images/prometheus_64x64.png");
s->getDescriptor()->setAttribute("icon16x16", "/prometheus/images/prometheus_16x16.ico");
......@@ -210,6 +194,10 @@ void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
//std::cout << "timer expired " << e.type() << std::endl;
if (e.getTimerTask()->name == "urn:prometheus:check" )
{
std::vector<std::weak_ptr<Collectable>> ontheflycollectables;
//std::cout << "going to process statistics" << std::endl;
// user example update
second_counter_.Increment();
......@@ -235,8 +223,8 @@ void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
///
auto exists = [this,registry,j]() {
for (auto&& wcollectable : collectables_)
auto exists = [this,registry,j, ontheflycollectables]() {
for (auto&& wcollectable : ontheflycollectables)
{
auto collectable = wcollectable.lock();
if (!collectable) {
......@@ -256,7 +244,7 @@ void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
if (! exists )
{
//std::cout << "---register item: " << j->first << " of type: " << j->second->type() << std::endl;
collectables_.push_back(registry);
ontheflycollectables.push_back(registry);
}
......@@ -266,6 +254,13 @@ void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
}
xdata::getInfoSpaceFactory()->unlock();
// retrieved all available collectables and makes them available to prometheus
mutex_.lock();
collectables_.clear();
std::copy(residentcollectables_.begin(), residentcollectables_.end(), back_inserter(collectables_));
std::copy(ontheflycollectables.begin(), ontheflycollectables.end(), back_inserter(collectables_));
mutex_.unlock();
return;
}
......@@ -315,6 +310,8 @@ void prometheus::Application::metrics(xgi::Input * in, xgi::Output * out )
// collect metrics
auto collected_metrics = std::vector<MetricFamily>{};
mutex_.lock();
for (auto&& wcollectable : collectables_)
{
......@@ -328,6 +325,8 @@ void prometheus::Application::metrics(xgi::Input * in, xgi::Output * out )
collected_metrics.insert(collected_metrics.end(), std::make_move_iterator(metrics.begin()), std::make_move_iterator(metrics.end()));
}
// end collect metrics
mutex_.unlock();
// serialize and output of metrics to requester
auto serializer = std::unique_ptr<Serializer>{new TextSerializer()};
......
......@@ -39,7 +39,8 @@ prometheus::Example::Example (xdaq::ApplicationStub* s)
// create a qualified infospace to contain prometheus objects
toolbox::net::URN urn = this->createQualifiedInfoSpace("prometheus");
xdata::InfoSpace * is = xdata::getInfoSpaceFactory()->get(urn.toString());
ispaceURN_ = urn.toString();
xdata::InfoSpace * is = xdata::getInfoSpaceFactory()->get(ispaceURN_);
// publish counter
is->lock();
......@@ -58,6 +59,9 @@ prometheus::Example::Example (xdaq::ApplicationStub* s)
timer->scheduleAtFixedRate( start, this,interval , 0, "urn:prometheus:watchdog" );
toolbox::TimeInterval period(20.0);
timer->scheduleAtFixedRate( start, this, period , 0, "urn:prometheus:admin" );
xgi::framework::deferredbind(this, this, &prometheus::Example::Default, "Default");
}
......@@ -86,8 +90,33 @@ void prometheus::Example::timeExpired(toolbox::task::TimerEvent& e)
auto h = histogram_({},xdata::Histogram::BucketBoundaries{0, 1, 2});
h.Observe(g.Value());
return;
}
static bool shallremove = true;
if (e.getTimerTask()->name == "urn:prometheus:admin" )
{
xdata::InfoSpace * is = xdata::getInfoSpaceFactory()->get(ispaceURN_);
if ( shallremove )
{
std::cout << "revoke histogram from exporting " << std::endl;
is->lock();
is->fireItemRevoked(prometheus::Example::MY_HISTOGRAM);
is->unlock();
shallremove = false;
}
else
{
std::cout << "fire histogram from exporting " << std::endl;
is->lock();
is->fireItemAvailable(prometheus::Example::MY_HISTOGRAM, &histogram_);
is->unlock();
shallremove = true;
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment