Commit b4c4eb1f authored by Domenico Giordano's avatar Domenico Giordano
Browse files

Merge branch 'qa' into 'master'

Qa

See merge request cloud-infrastructure/data-analytics!13
parents 8f6ce176 92d9ecbe
secrets/*~
grafana_token.txt~
grafana_token.txt
/secrets/
.ipynb_checkpoints
*/.ipynb_checkpoints/*
......
......@@ -27,9 +27,23 @@ job_build_tox_image:
changes:
- docker-images/tox/*
job_build_jupyter_image:
stage: build-images
before_script:
- export DOCKERFILE=$CI_PROJECT_DIR/docker-images/jupyter/Dockerfile
- export CONTEXT=$CI_PROJECT_DIR
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/jupyter:latest --destination $CI_REGISTRY_IMAGE/jupyter:${CI_COMMIT_SHA:0:8}"
<<: *template_build_image
only:
refs:
- master
- qa
changes:
- docker-images/jupyter/*
pep8:
stage: test
image: gitlab-registry.cern.ch/cloud-infrastructure/cci-scripts/tox
image: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/tox
except:
- tags
script:
......@@ -38,7 +52,7 @@ pep8:
coverage:
stage: test
image: gitlab-registry.cern.ch/cloud-infrastructure/cci-scripts/tox
image: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/tox
except:
- tags
script:
......@@ -47,3 +61,21 @@ coverage:
artifacts:
paths:
- cover
job_run_notebook_grafana_etl:
stage: test
image: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/jupyter:latest
only:
refs:
- qa
script:
- echo $GRAFANA_KEY > $CI_PROJECT_DIR/grafana_etl/tests/secrets/grafana_token.txt
- ls -l
- pwd
- cd $CI_PROJECT_DIR/grafana_etl/tests/
- jupyter nbconvert --to notebook --execute test_ETL.ipynb
artifacts:
paths:
- $CI_PROJECT_DIR/grafana_etl/tests/test_ETL.nbconvert.ipynb
expire_in: 1 week
when: always
\ No newline at end of file
FROM jupyter/scipy-notebook:latest
COPY . /to_be_installed
RUN cd /to_be_installed ; pip install .
ENTRYPOINT []
CMD []
\ No newline at end of file
......@@ -50,7 +50,7 @@ class ETLDriver(object):
try:
self.etl[section] = ETL(section, opts)
except Exception as e:
logger.error(e.message)
logger.error(e)
def perform(self):
"""Perform actions.
......@@ -116,7 +116,7 @@ class ETL(object):
setattr(self, var_name, self.getObject(opts[var_name]))
return True
except Exception as e:
logger.error(e.message)
logger.error(e)
setattr(self, var_name, None)
return False
......@@ -129,7 +129,11 @@ class ETL(object):
is module name; ESLogHandler is class name
"""
try:
module, class_name = opts['class'].split('.')[0:2]
module, class_name = opts['class'].rsplit('.', 1)[0:2]
logger.info(
"Request for module %s and class_name %s"
% (module, class_name)
)
m = __import__(module)
if hasattr(m, class_name):
objectClass = getattr(m, class_name)
......@@ -138,7 +142,7 @@ class ETL(object):
print("no class %s in module %s" % (class_name, module))
return None
except Exception as e:
logger.error(e.message)
logger.error(e)
return None
def perform(self):
......
from __future__ import print_function # compatibility readons
from functools import reduce
from grafana_etl import mylogger
import os
import pandas as pd
from pandasticsearch import Agg
import requests
......@@ -52,7 +53,13 @@ class BaseHandler(object):
def gettoken(self):
"""Load token for authentication."""
try:
with open(self.args['token'], 'r') as f:
token_file = self.args['token']
if not os.path.isfile(token_file):
# file path is relative to installation
token_file = os.path.join(os.path.dirname(__file__),
token_file)
logger.info('Opening token file ' + token_file)
with open(token_file, 'r') as f:
self.token = f.read().splitlines()[0]
except Exception:
self.token = None
......@@ -128,7 +135,13 @@ class ESLogHandler(BaseHandler):
valid_args = ['url', 'token', 'request', 'name']
def singlerequest(self, query, params):
with open(params['file'], 'r') as fin:
request_file = params['file']
if not os.path.isfile(request_file):
# file path is relative to installation
request_file = os.path.join(os.path.dirname(__file__),
request_file)
logger.info('Opening request file ' + request_file)
with open(request_file, 'r') as fin:
encoded_data = fin.read()
fin.close()
......@@ -136,21 +149,22 @@ class ESLogHandler(BaseHandler):
for k, v in params['args'].items():
encoded_data = encoded_data.replace('__insert_%s__' % k, v)
headers = {'Authorization': 'Bearer %s' % self.token}
headers = {'Authorization': 'Bearer %s' % self.token,
'Content-Type': 'application/json'}
url = self.args['url']
try:
res = requests.post(url, headers=headers, json=encoded_data)
# logger.debug('Response status: %i' % res.status_code)
# logger.debug('Response content: %s' % res._content)
res = requests.post(url, headers=headers, data=encoded_data)
jres = res.json()
if 'status' in jres.keys() and jres['status'] != 200:
raise Exception('response != 200')
self.jres[query] = jres
except Exception as e:
logger.error(e)
logger.error('Response status: %i' % res.status_code)
logger.error('Response content: %s' % res._content)
logger.error('encoded_data: %s' % encoded_data)
logger.error('response: %s' % res.json())
logger.error('response: %s' % res)
def getdata(self):
"""Retrieve data from elasticsearch endpoint."""
......
from grafana_etl.GHandler import ESLogHandler # noqa: F401
from grafana_etl.GHandler import InfluxDBHandler # noqa: F401
from grafana_etl.TSAnalyser import RollingStatAnalyser # noqa: F401
{"search_type":"query_then_fetch","ignore_unavailable":true,"index":["monit_prod_openstack_logs_generic*","monit_prod_openstack_logs_generic*"]}
{"search_type":"query_then_fetch","ignore_unavailable":true,"index":["monit_private_openstack_logs_*","monit_private_openstack_logs_*"]}
{"size":0,"query":{"bool":{"filter":[{"range":{"metadata.timestamp":{"gte":"__insert_gte__","lte":"__insert_lte__","format":"epoch_millis"}}},{"query_string":{"analyze_wildcard":true,"query":"__insert_query__"}}]}},"aggs":{"__insert_agg_field__":{"terms":{"field":"__insert_agg_field__","size":50,"order":{"_term":"desc"},"min_doc_count":1},"aggs":{"epoch":{"date_histogram":{"interval":"__insert_interval__","field":"metadata.timestamp","min_doc_count":1,"extended_bounds":{"min":"__insert_gte__","max":"__insert_lte__"},"format":"epoch_millis"},"aggs":__insert_aggs_metric__}}}}}
Openstack_Log_count: &etl_default
handler: &es_default
class: GHandler.ESLogHandler
url: https://monit-grafana.cern.ch/api/datasources/proxy/27/_msearch
token: ../secrets/grafana_token.txt
request:
logs:
file: ../requests/request_agg_es.tmpl
metrics: ['num']
tags: ['data.log_level']
args:
gte: now-12h/h
lte: now-10m/m
interval: 10m
#query: '!data.log_level: INFO AND !data.log_level: ERROR'
query: 'data.log_level: ERROR OR data.log_level: error'
#agg_field: 'metadata.hostgroup'
agg_field: 'data.log_level'
aggs_metric: '{"num":{"cardinality":{"field":"metadata._id"}}}'
analysers:
ARollingStatAnalyser:
class: TSAnalyser.RollingStatAnalyser
strategy: 'quantile'
roll_win: 20
shift_win: 1
th: .1
Rally_Error_Count:
handler:
<<: *es_default
request:
count_fails:
file: ../requests/request_agg_es.tmpl
metrics: ['Nfail']
tags: ['data.source']
args:
gte: now-24h/h
lte: now-10m/m
interval: 10m
query: 'data.source: rallytester AND data.succeed: [0 TO 1]'
agg_field: 'data.source'
aggs_metric: '{"Nfail":{"sum":{"field":"data.succeed","script":{"inline":"_value*-1+1"}}}}'
fail_cat:
file: ../requests/request_agg_es.tmpl
metrics: ['Num']
tags: ['data.short_msg']
args:
gte: now-24h/h
lte: now-10m/m
interval: 10m
query: 'data.source:rallytester AND data.log_level:ERROR AND metadata.environment:rally_master'
agg_field: 'data.short_msg'
aggs_metric: '{"Num":{"cardinality":{"field":"metadata._id"}}}'
analysers:
ARollingStatAnalyser:
class: TSAnalyser.RollingStatAnalyser
strategy: 'quantile'
roll_win: 20
shift_win: 1
th: .1
# Test Influx
# Access two data sources (cpu usage and load), extract the time series following the
# query definitions (q) and put the results in a single pandas dataframe table
test_influx:
handler:
class: grafana_etl.GHandler.InfluxDBHandler
url: https://monit-grafana.cern.ch/api/datasources
token: secrets/grafana_token.txt
db: monit_production_collectd
request:
cpu_system:
id: 7885
epoch: 'ms'
q: SELECT mean("mean_value") FROM one_week."cpu_percent" WHERE "type_instance" = 'system' AND "submitter_hostgroup" =~ /^.*level2\\/.*\\/gva_shared_014$/ AND time >= now()-4h and time <= now() GROUP BY time(30m), "host", "submitter_hostgroup";
load:
id: 7889
epoch: 'ms'
q: SELECT mean("mean_value") as amean FROM one_week."load" WHERE ("value_instance" = 'longterm' AND "submitter_hostgroup" =~ /^.*level2\\/.*\\/gva_shared_014$/ ) AND time >= now()-4h and time <= now() GROUP BY time(30m),"host" , "submitter_hostgroup";
# Test ElasticSearch
# Execute 2 separate queries, extract the time series and put the results in a single pandas dataframe table
# Following a similar query as done in grafana for ES
test_es:
handler:
class: grafana_etl.GHandler.ESLogHandler
url: https://monit-grafana.cern.ch/api/datasources/proxy/27/_msearch
token: secrets/grafana_token.txt
request: # The two requests
# First query: count number of failing rally tests ({"sum":{"field":"data.succeed","script":{"inline":"_value*-1+1"}})
# Aggregated by data.source
count_fails:
file: requests/request_agg_es.tmpl
metrics: ['Nfail'] # Name of the metric to be identified in the pandas column
tags: ['data.source'] # Aggregation by tags in the pandas columns
args: #Arguments of the ES query
gte: now-24h/h
lte: now-10m/m
interval: 10m
query: 'data.source: rallytester AND data.succeed: [0 TO 1]'
agg_field: 'data.source'
aggs_metric: '{"Nfail":{"sum":{"field":"data.succeed","script":{"inline":"_value*-1+1"}}}}'
# Second query: count number of distinct failing rally tests ({"cardinality":{"field":"metadata._id"})
# aggregated by data.short_msg
# in the last 24 hours
fail_cat:
file: requests/request_agg_es.tmpl
metrics: ['Num']
tags: ['data.short_msg']
args:
gte: now-24h/h
lte: now-10m/m
interval: 10m
query: 'data.source:rallytester AND data.log_level:ERROR AND metadata.environment:rally_master'
agg_field: 'data.short_msg'
aggs_metric: '{"Num":{"cardinality":{"field":"metadata._id"}}}'
analysers:
ARollingStatAnalyser:
class: grafana_etl.TSAnalyser.RollingStatAnalyser
strategy: 'quantile'
roll_win: 20
shift_win: 1
th: .1
alpha: 1.2
%% Cell type:markdown id: tags:
# ETL Driver
%% Cell type:code id: tags:
``` python
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
from grafana_etl import ETLDriver
```
%% Cell type:code id: tags:
``` python
ETL = ETLDriver.ETLDriver(config_name='etc/test_grafana_etl.yaml')
```
%% Cell type:code id: tags:
``` python
ETL.cfgs
```
%% Cell type:code id: tags:
``` python
ETL.etl
```
%% Cell type:code id: tags:
``` python
test_influx = ETL.etl['test_influx']
```
%% Cell type:code id: tags:
``` python
test_influx.perform()
```
%% Cell type:code id: tags:
``` python
#test_influx.handler.plot()
```
%% Cell type:code id: tags:
``` python
test_influx.handler.pdf
```
%% Cell type:code id: tags:
``` python
test_es = ETL.etl['test_es']
```
%% Cell type:code id: tags:
``` python
test_es.perform()
```
%% Cell type:code id: tags:
``` python
test_es.handler.pdf
```
%% Cell type:code id: tags:
``` python
test_es.handler.plot()
```
%% Cell type:code id: tags:
``` python
import requests
```
%% Cell type:code id: tags:
``` python
ETL.perform()
```
%% Cell type:code id: tags:
``` python
```
from grafana_etl import ETLDriver
from grafana_etl import GHandler
from grafana_etl import mylogger
import os
......@@ -7,16 +8,33 @@ import yaml
logger = mylogger.getLogger()
class TestConf(unittest.TestCase):
class TestETL(unittest.TestCase):
def setUp(self):
head, _ = os.path.split(__file__)
self.path = os.path.normpath(
os.path.join(head, 'etc/test_grafana_ES.yaml'))
def test_succeed_read_conf(self):
with open(self.path) as f:
test_conf = f.read()
cfgs = yaml.safe_load(test_conf)
g = GHandler.ESLogHandler(**cfgs['Openstack_Log_count']['handler'])
print("THIS IS " + __file__ + " and " + __package__)
filepath, _ = os.path.split(__file__)
self.configfile = os.path.normpath(
os.path.join(filepath, 'etc/test_grafana_etl.yaml'))
print("THIS IS DIR %s " % os.getcwd())
# Change dir in order to find request and secret dirs
os.chdir(os.path.join(filepath, '/..'))
print("THIS IS new DIR %s" % os.getcwd())
def test_succeed_create_ESLogHandler(self):
with open(self.configfile) as f:
configuration = f.read()
cfg = yaml.safe_load(configuration)
g = GHandler.ESLogHandler(**cfg['test_es']['handler'])
self.assertIsNotNone(g)
def test_succeed_create_InfluxDBHandler(self):
with open(self.configfile) as f:
configuration = f.read()
cfg = yaml.safe_load(configuration)
g = GHandler.InfluxDBHandler(**cfg['test_influx']['handler'])
self.assertIsNotNone(g)
def test_succeed_create_ETLDriver(self):
d = ETLDriver.ETLDriver(config_name=self.configfile)
self.assertIsNotNone(d)
import logging
import unittest
class TestDefault(unittest.TestCase):
def setUp(self):
logging.disable(logging.CRITICAL)
def test_default_args(self):
pass
......@@ -7,3 +7,4 @@ pandasticsearch
requests
scipy
six
pyyml
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment