Skip to content
Snippets Groups Projects

Draft: added support for influxdb tags in DCS uploads

Closed Derek August Hamersly requested to merge feat/add-tags-to-dcs into devel
3 files
+ 91
59
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -21,12 +21,14 @@ import logging
logger = logging.getLogger('Log')
db_logging.setLog()
#defaults
remote_host = "127.0.0.1"
remote_port = "8086"
dbname = ""
username = ""
password = ""
influxpath = "" #in url, servername.edu/INFLUXPATH
use_ssl = False
class dcsDataClient:
def __init__(self):
@@ -40,10 +42,13 @@ class dcsDataClient:
try:
# Try to connect without authentication
self.client=InfluxDBClient(host=remote_host,
port=remote_port,
database=dbname,
username=username,
password=password)
port=remote_port,
database=dbname,
ssl=use_ssl,
verify_ssl=use_ssl,
path=influxpath,
username=username,
password=password)
database_list=self.client.get_list_database()
@@ -62,6 +67,9 @@ class dcsDataClient:
self.client=InfluxDBClient(host=remote_host,
port=remote_port,
database=dbname,
ssl=use_ssl,
verify_ssl=use_ssl,
path=influxpath,
username=username,
password=password)
@@ -73,12 +81,12 @@ class dcsDataClient:
# If not, let the user know what's happening and exit.
logger.error("Received error from InfluxDB: {}".format(e))
logger.warn("Please specify the db connectivity parameters "
logger.warning("Please specify the db connectivity parameters "
"\"database\", and optionally also \"username\", "
"\"host\" and \"port\" in the .\"influxdb_cfg\" section in"
"the file passed with --dcs_config or -d (-F if calling from dbAccessor)")
logger.warn("See an example in {}/localdb/configs/influxdb_connectivity.json".format(current_DIR))
logger.warning("See an example in {}/localdb/configs/influxdb_connectivity.json".format(current_DIR))
sys.exit(1)
@@ -94,10 +102,14 @@ class dcsDataClient:
self.runStart=start
self.runEnd=end
def getEvent(self,measName):
def getEvent(self,measName,tags={}):
if self.db_exist :
if {'name':measName} in self.meas_list :
return self.client.query('SELECT * FROM \"{0}\" WHERE time >= {1} AND time <= {2}'.format(measName,self.runStart,self.runEnd))
query_string = "SELECT * FROM \"{0}\" WHERE time >= {1} AND time <= {2}".format(measName,self.runStart,self.runEnd)
if len(tags) > 0:
for tag in tags:
query_string += " and {0}='{1}'".format(tag,tags[tag])
return self.client.query(query_string)
else :
#error_massage('measurement',measName)
return None
@@ -125,7 +137,7 @@ def error_message(key_type,name ):
def loadDBConnectivity(dcs_config_fname):
global remote_host, remote_port, dbname, username
global remote_host, remote_port, dbname, username, influxpath, use_ssl
with open(dcs_config_fname,'r') as dcs_config_f :
dcs_config=json.load(dcs_config_f)
@@ -141,14 +153,13 @@ def loadDBConnectivity(dcs_config_fname):
"\"host\" and \"port\" in the .\"influxdb_cfg\" section in \"{}\"".format(dcs_config_fname))
sys.exit(1)
if "host" in dcs_config["influxdb_cfg"]:
remote_host = dcs_config["influxdb_cfg"]["host"]
if "port" in dcs_config["influxdb_cfg"]:
remote_port = dcs_config["influxdb_cfg"]["port"]
if "username" in dcs_config["influxdb_cfg"]:
username = dcs_config["influxdb_cfg"]["username"]
dbname = dcs_config["influxdb_cfg"]["database"]
cfg = dcs_config["influxdb_cfg"]
remote_host = cfg.get("host",remote_host)
remote_port = cfg.get("port",remote_port)
username = cfg.get("username",username)
influxpath = cfg.get("influx-path",influxpath)
use_ssl = cfg.get("use-ssl",False)
dbname = cfg.get("database",dbname)
def getData(client, dcs_config_fname):
output_jsonName='{}/dcsDataInfo.json'.format(output_DIR)
@@ -166,7 +177,7 @@ def getData(client, dcs_config_fname):
measurement=dcsList.get('measurement')
now_ts=time_module.time()
datFileName='{}/{}{}.dat'.format(output_DIR,measurement,now_ts)
data_list=client.getEvent(measurement)
data_list=client.getEvent(measurement) #ignore tags first
#chip='OU078B_chip1'
if data_list is None :
error_message("measurement",measurement)
@@ -177,13 +188,16 @@ def getData(client, dcs_config_fname):
mode_text='mode null '
setting_text='setting null '
keyList=[]
tagsList=[]
for dcs in dcsList['dcsList']:
key=dcs.get('key')
tags=dcs.get('tags',{})
num=0
for i_key in keyList :
if key==i_key :
num+=1
keyList.append(key)
tagsList.append(tags)
key_text+=str(key)+' '
num_text+=str(num)+' '
mode_text+='null '
@@ -196,6 +210,9 @@ def getData(client, dcs_config_fname):
#dat_f.write(mode_text+'\n')
dat_f.write(setting_text+'\n')
#filter by tags
data_list = client.getEvent(measurement,tags)
for data in data_list[measurement] :
time=data.get('time')
unixtime=datetime.datetime.strptime(time[0:19], '%Y-%m-%dT%H:%M:%S').replace(tzinfo=pytz.utc).timestamp()
Loading