Commit 1f187207 authored by Antonin Dvorak's avatar Antonin Dvorak
Browse files

sqlite retries

parent b344ec7c
......@@ -11,6 +11,7 @@ import pandas as pd
from adcern.publisher import generate_result
from etl.spark_etl.utils import read_json_config
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end
from adcern.sqlite3_backend import modify_db
from sklearn.preprocessing import RobustScaler
import sqlite3
......@@ -364,25 +365,22 @@ class BaseOutlierAnalyser(ABC):
"end_window": str(int(end_window)),
"noramlization_id": str(normalization_id)})
# connect to the db
conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm,
end_window, noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
c.execute('''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.commit()
conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=120)
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm,
end_window, noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
modify_db(conn_score, '''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.close()
......
......@@ -22,6 +22,7 @@ from hashlib import blake2b
os.environ['PYSPARK_PYTHON'] = sys.executable
from etl.spark_etl.utils import read_json_config # noqa
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end # noqa
from adcern.sqlite3_backend import modify_db
# TRADITIONAL
# from pyod.models.iforest import IForest # noqa
......@@ -227,27 +228,23 @@ def save_scores_local_sqlite(analyser,
# connect to the db
conn_score = \
sqlite3.connect(score_folder
+ '/scores_' + algorithm_name + '.db', timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm, end_window,
noramlization_id))''')
conn_score.commit()
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
c.execute('''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.commit()
conn_score.commit()
+ '/scores_' + algorithm_name + '.db', timeout=120)
# ensure the table is there
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm, end_window,
noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
modify_db(conn_score, '''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.close()
......@@ -641,7 +638,7 @@ def analysis(module_name, class_name, alias_name, hyperparameters,
with open(file_path_config_train) as json_file:
data_dict_train = json.load(json_file)
# connect to the db
conn = sqlite3.connect(folder_training_time + '/time.db')
conn = sqlite3.connect(folder_training_time + '/time.db', timeout = 120)
c = conn.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS time
......
......@@ -17,6 +17,7 @@ import sklearn.metrics
from miscellanea.experiment_comparison import plot_roc # noqa
from sklearn.metrics import precision_recall_curve # noqa
import sklearn
from adcern.sqlite3_backend import modify_db
# example of start and end of week
......@@ -357,16 +358,14 @@ def score_benchmark(folder_scores, hostgroup,
aucs_weeks = []
# connect to the db
conn_score = sqlite3.connect(labels_folder + '/week_metrics.db',
timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS auc
(hostgroup text, algorithm text, family text,
auc_score real, week_index int, end_week int,
PRIMARY KEY
(hostgroup, algorithm, end_week))''')
conn_score.commit()
timeout=120)
# ensure the table is there
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS auc
(hostgroup text, algorithm text, family text,
auc_score real, week_index int, end_week int,
PRIMARY KEY
(hostgroup, algorithm, end_week))''')
# FOR EVERY WEEK -------------------------------------------------------
for w in sorted(weeks_available):
print("WEEK: ", w, end=" - ")
......@@ -385,10 +384,9 @@ def score_benchmark(folder_scores, hostgroup,
# ax=dict_ax_rocs[algo_name], alpha=0.2)
print("AUC: ", roc_auc)
aucs_weeks.append(roc_auc)
c.execute('''INSERT OR IGNORE INTO auc
modify_db(conn_score, '''INSERT OR IGNORE INTO auc
VALUES (?, ?, ?, ?, ?, ?)''',
(hostgroup, algo_name, family, roc_auc, int(w), end_week))
conn_score.commit()
conn_score.close()
# # CUMULATIVE QUANTITIES
......@@ -457,7 +455,7 @@ def score_benchmark(folder_scores, hostgroup,
def visualize_auc(hostgroup, input_folder, output_folder):
"""Visualize the AUC results form the selected algo."""
# read the database of scores in a pandas DF
conn = sqlite3.connect(input_folder + "/week_metrics.db")
conn = sqlite3.connect(input_folder + "/week_metrics.db", timeout = 120)
df_week_auc = pd.read_sql_query(
"SELECT * FROM auc WHERE hostgroup='{}'".format(hostgroup), conn)
conn.close()
......
import sqlite3
def modify_db(conn,
query,
*args):
"""Wrapper for modifications (INSERT, UPDATE, DELETE or REPLACE) of Sqlite3 database.
Executes given query and commits it - in case of lock it retries the commit
Params
------
connection: sqlite3 connection
existing sqlite3 connection to use
query: str
query to run
args: array
additional arguments for execute query, optional
"""
with conn.cursor() as c:
if args:
c.execute(query,args)
else:
c.execute(query)
# retry commit - db might be locked by different process
for x in range(0, 10):
try:
conn.commit()
except:
print("Commit to sqlite unsuccessful, retrying....")
time.sleep(1)
pass
finally:
break
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment