Application.cc 10.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// $Id$

/*************************************************************************
 * XDAQ Components for Distributed Data Acquisition                      *
 * Copyright (C) 2000-2019, CERN.                                        *
 * All rights reserved.                                                  *
 * Authors: L. Orsini, D. Simelevicius                                   *
 *                                                                       *
 * For the licensing terms see LICENSE.                                  *
 * For the list of contributors see CREDITS.                             *
 ************************************************************************/
#include "prometheus/Application.h"
#include "prometheus/counter.h"
#include "prometheus/summary.h"

#include <cstring>
#include <iterator>
18
19
#include <algorithm>
#include <vector>
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

#ifdef HAVE_ZLIB
#include <zlib.h>
#endif

#include "prometheus/serializer.h"
#include "prometheus/text_serializer.h"

#include "cgicc/CgiDefs.h"
#include "cgicc/Cgicc.h"
#include "cgicc/HTTPHTMLHeader.h"
#include "cgicc/HTMLClasses.h"
#include "cgicc/HTTPResponseHeader.h" 
#include "toolbox/task/TimerTask.h"
#include "toolbox/task/Timer.h"
#include "toolbox/task/TimerFactory.h"

#include "xdaq/ApplicationGroup.h" 
#include "xdaq/ApplicationRegistry.h"
#include "xdaq/EndpointAvailableEvent.h"

#include "xgi/framework/Method.h" 
#include "xcept/tools.h"


#include "toolbox/task/WorkLoopFactory.h"
#include "xdata/InfoSpaceFactory.h"
#include "xdata/exdr/Serializer.h"
#include "toolbox/Runtime.h"

#include "xdata/Table.h"
#include "xdata/exdr/Serializer.h"
#include "xdaq/ApplicationDescriptorImpl.h"

#include "toolbox/TimeVal.h"
#include "toolbox/stl.h"
#include "toolbox/utils.h"
#include "toolbox/regex.h"
#include "toolbox/BSem.h"
#include "toolbox/exception/Handler.h"

#include "xdata/exdr/Serializer.h"
#include "xdata/exdr/AutoSizeOutputStreamBuffer.h"
#include "xdata/exdr/FixedSizeInputStreamBuffer.h"
#include "xdata/exdr/FixedSizeOutputStreamBuffer.h"

#include <iostream>     // std::cout
#include <fstream>      // std::ifstream


static std::size_t WriteResponse(xgi::Input * in, xgi::Output * out, const std::string& body) 
{
72

73
	out->getHTTPResponseHeader().getStatusCode(200).getReasonPhrase(xgi::Utils::getResponsePhrase(200)).addHeader("Content-Type", "text/plain");
74
75

	#ifdef HAVE_ZLIB
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
	//auto acceptsGzip = IsEncodingAccepted(conn, "gzip");
	auto acceptsGzip = false;
	auto accept_encoding = in->getenv("ACCEPT_ENCODING");
	if (accept_encoding == "gzip" ) acceptsGzip = true;

	if (acceptsGzip) {
		auto compressed = GZipCompress(body);
		if (!compressed.empty()) {
			out->getHTTPResponseHeader().addHeader("Content-Encoding", "gzip");
			auto cl = toolbox::toString("%lu",static_cast<unsigned long>(compressed.size()));
			out->getHTTPResponseHeader().addHeader("Content-Length", cl.c_str());
			out->write(compressed.data(),  compressed.size());
			return compressed.size());
		}
	}
91
#endif
92

93
94
	auto bcl = toolbox::toString("%lu",static_cast<unsigned long>(body.size()));
	out->getHTTPResponseHeader().addHeader("Content-Length", bcl.c_str());
95

96
	out->write(body.data(), body.size());
97
98

	//DEBUG
99
	std::cout << body.data() << std::endl;
100

101
	return  body.size();
102
103
104
105
106
107
	//end write response
}


#ifdef HAVE_ZLIB
static std::vector<Byte> GZipCompress(const std::string& input) {
108
109
110
	auto zs = z_stream{};
	auto windowSize = 16 + MAX_WBITS;
	auto memoryLevel = 9;
111

112
113
114
115
	if (deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowSize,
			memoryLevel, Z_DEFAULT_STRATEGY) != Z_OK) {
		return {};
	}
116

117
118
	zs.next_in = (Bytef*)input.data();
	zs.avail_in = input.size();
119

120
121
122
	int ret;
	std::vector<Byte> output;
	output.reserve(input.size() / 2u);
123

124
125
	do {
		static const auto outputBytesPerRound = std::size_t{32768};
126

127
128
129
		zs.avail_out = outputBytesPerRound;
		output.resize(zs.total_out + zs.avail_out);
		zs.next_out = reinterpret_cast<Bytef*>(output.data() + zs.total_out);
130

131
		ret = deflate(&zs, Z_FINISH);
132

133
134
		output.resize(zs.total_out);
	} while (ret == Z_OK);
135

136
	deflateEnd(&zs);
137

138
139
140
	if (ret != Z_STREAM_END) {
		return {};
	}
141

142
	return output;
143
144
145
}
#endif

146
#include "xdata/Counter.h"
147
148
149

XDAQ_INSTANTIATOR_IMPL(prometheus::Application);

150

151
prometheus::Application::Application (xdaq::ApplicationStub* s) 
152
153
154
155
156
157
158
159
160
161
162
: xdaq::Application(s), xgi::framework::UIManager(this),
  exposer_registry_(std::make_shared<Registry>()),
  bytes_transferred_family_(BuildCounter().Name("exposer_transferred_bytes_total").Help("Transferred bytes to metrics services").Register(*exposer_registry_)),
  bytes_transferred_(bytes_transferred_family_.Add({})),
  num_scrapes_family_(BuildCounter().Name("exposer_scrapes_total") .Help("Number of times metrics were scraped") .Register(*exposer_registry_)),
  num_scrapes_(num_scrapes_family_.Add({})),
  request_latencies_family_(BuildSummary().Name("exposer_request_latencies") .Help("Latencies of serving scrape requests, in microseconds") .Register(*exposer_registry_)),
  request_latencies_(request_latencies_family_.Add( {}, Summary::Quantiles{{0.5, 0.05}, {0.9, 0.01}, {0.99, 0.001}})),
  user_exposer_registry_(std::make_shared<Registry>()),
  counter_family_(BuildCounter().Name("time_running_seconds_total").Help("How many seconds is this server running?").Labels({{"label", "value"}}).Register(*user_exposer_registry_)),
  second_counter_(counter_family_.Add( {{"another_label", "value"}, {"yet_another_label", "value"}}))
163
164
{

165

166
167
168
169
	//collectables_.push_back(exposer_registry_);
	//collectables_.push_back(user_exposer_registry_);
	residentcollectables_.push_back(exposer_registry_);
	residentcollectables_.push_back(user_exposer_registry_);
170
171
172
173
174
175
176
177
178

	s->getDescriptor()->setAttribute("icon", "/prometheus/images/prometheus_64x64.png");
	s->getDescriptor()->setAttribute("icon16x16", "/prometheus/images/prometheus_16x16.ico");

	//
	// bind HTTP callback
	xgi::bind(this, &prometheus::Application::metrics, "metrics");

	xgi::framework::deferredbind(this, this, &prometheus::Application::Default, "Default");
179

180
181
	// Bind setting of default parameters
	getApplicationInfoSpace()->addListener(this, "urn:xdaq-event:setDefaultValues");
182
183
	getApplicationContext()->addActionListener(this);

184
185
186
187
188
189
190
191
192
193
194
195
196
}

prometheus::Application::~Application()
{

}
void prometheus::Application::timeExpired(toolbox::task::TimerEvent& e)
{
	LOG4CPLUS_DEBUG (this->getApplicationLogger(), "Timer callback");

	//std::cout << "timer expired " << e.type()  << std::endl;
	if (e.getTimerTask()->name  == "urn:prometheus:check" )
	{
197
198
199
200

		std::vector<std::weak_ptr<Collectable>> ontheflycollectables;


201
202
203
204
		//std::cout << "going to process statistics" << std::endl;
		// user example update
		second_counter_.Increment();

205
206
207
208
209
210
211
212
213
214
		// scan infospaces and register items
		xdata::getInfoSpaceFactory()->lock();

		std::vector<xdata::InfoSpace*> spaces;

		auto iss = xdata::getInfoSpaceFactory()->match("urn:prometheus:(.*)");
		for (auto it = iss.begin(); it != iss.end(); ++it)
		{
			xdata::InfoSpace * is = dynamic_cast<xdata::InfoSpace *>((*it).second);
			is->lock();
215
216
			//std::cout << "---found infospace: " <<  is->name() << std::endl;
			LOG4CPLUS_DEBUG(this->getApplicationLogger(), "found infospace: " << is->name() );
217
218
219
220
221
222
223
224
225
			auto items = is->match("(.*)"); // get all items
			for (auto j = items.begin(); j != items.end(); j++)
			{
				if ( j->second->type().find("prometheus") != std::string::npos )
				{
					auto registry = dynamic_cast<xdata::Collectable*>(j->second)->registry_;


					///
226
227
					auto exists = [this,registry,j, ontheflycollectables]() {
						for (auto&& wcollectable : ontheflycollectables)
228
229
230
231
232
233
234
235
						{
							auto collectable = wcollectable.lock();
							if (!collectable) {
								continue;
							}

							if ( collectable == registry ) // not sure it compares the actual fired item
							{
236
								//std::cout << "---found already registered collectable: " << j->first << " of type: " << j->second->type() << std::endl;
237
238
239
240
241
242
243
244
245
								return true;
							}

						}
						return false;
					}();

					if (! exists )
					{
246
						//std::cout << "---register item: " << j->first <<  " of type: " <<  j->second->type() << std::endl;
247
						ontheflycollectables.push_back(registry);
248
249
250
251
252
253
254
255
256
					}


				}
			}
			is->unlock();
		}

		xdata::getInfoSpaceFactory()->unlock();
257
258
259
260
261
262
263
		// retrieved all available collectables and makes them available to prometheus
		mutex_.lock();
		collectables_.clear();
		std::copy(residentcollectables_.begin(), residentcollectables_.end(), back_inserter(collectables_));
		std::copy(ontheflycollectables.begin(), ontheflycollectables.end(), back_inserter(collectables_));
		mutex_.unlock();

264
265
		return;
	}
266

267
268
269
270
271
272
273
274

}

void prometheus::Application::actionPerformed( xdata::Event& event) 
{
	if (event.type() == "urn:xdaq-event:setDefaultValues")
	{	

275

276
277
278
279
280
281
282
283
284
285
286
287
288
289
290


	}
	else
	{
		LOG4CPLUS_ERROR (this->getApplicationLogger(), "Received unsupported event type '" << event.type() << "'");
	}
}


void prometheus::Application::actionPerformed(toolbox::Event& e) 
{
	LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Received event " << e.type());
	if ( e.type() == "urn:xdaq-event:profile-loaded")
	{
291
292
293
294
295
296
297
298
299
300
301

		// activate a timer for user example
				toolbox::task::Timer * timer = 0;
				timer = toolbox::task::getTimerFactory()->createTimer("urn:prometheus:timer");

				toolbox::TimeInterval interval;
				toolbox::TimeVal start;
				start = toolbox::TimeVal::gettimeofday();

				interval.fromString("PT5S");
				timer->scheduleAtFixedRate( start, this,interval , 0, "urn:prometheus:check" );
302
303
304
305
306
307
308
309
310
311
312
	}
}

void prometheus::Application::metrics(xgi::Input * in, xgi::Output * out ) 
{
	cgicc::Cgicc cgi(in);
	auto start_time_of_request = std::chrono::steady_clock::now();

	//  collect metrics 
	auto collected_metrics = std::vector<MetricFamily>{};

313
314
	mutex_.lock();

315
	for (auto&& wcollectable : collectables_)
316
317
	{

318
319
320
321
		auto collectable = wcollectable.lock();
		if (!collectable) {
			continue;
		}
322
323

		auto&& metrics = collectable->Collect();
324
		//std::cout << "collectable insert" << std::endl;
325
326
		collected_metrics.insert(collected_metrics.end(), std::make_move_iterator(metrics.begin()), std::make_move_iterator(metrics.end()));
	}
327
	// end collect metrics
328
329
	mutex_.unlock();

330
331

	// serialize and output of metrics to requester 
332
	auto serializer = std::unique_ptr<Serializer>{new TextSerializer()};
333
334
335

	auto bodySize = WriteResponse(in, out, serializer->Serialize(collected_metrics));

336
337
338
339
	// update prometheus plugin metrics

	auto stop_time_of_request = std::chrono::steady_clock::now();
	auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time_of_request - start_time_of_request);
340
341
342
343
344
345
346
347
348
349

	request_latencies_.Observe(duration.count());
	bytes_transferred_.Increment(bodySize);
	num_scrapes_.Increment();

}

void prometheus::Application::Default(xgi::Input * in, xgi::Output * out ) 
{

350
	*out << cgicc::h3("XDAQ Prometheus Plugin") << std::endl;
351
352

}