From 7433c3325c5cb1c70f808b3b5eaf2341a28b64b7 Mon Sep 17 00:00:00 2001 From: Antonin Dvorak <antonin.dvorak@cern.ch> Date: Wed, 21 Apr 2021 21:44:29 +0200 Subject: [PATCH] sqlite retries --- adcern/analyser.py | 36 ++++++++++++++--------------- adcern/cmd/data_mining.py | 41 ++++++++++++++++------------------ adcern/cmd/elaborate_scores.py | 24 +++++++++----------- adcern/sqlite3_backend.py | 36 +++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 54 deletions(-) create mode 100644 adcern/sqlite3_backend.py diff --git a/adcern/analyser.py b/adcern/analyser.py index aff30897..af02e792 100644 --- a/adcern/analyser.py +++ b/adcern/analyser.py @@ -11,6 +11,7 @@ import pandas as pd from adcern.publisher import generate_result from etl.spark_etl.utils import read_json_config from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end +from adcern.sqlite3_backend import modify_db from sklearn.preprocessing import RobustScaler import sqlite3 @@ -364,25 +365,22 @@ class BaseOutlierAnalyser(ABC): "end_window": str(int(end_window)), "noramlization_id": str(normalization_id)}) # connect to the db - conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=30) - with conn_score: - c = conn_score.cursor() - # ensure the table is there - c.execute('''CREATE TABLE IF NOT EXISTS scores - (hostgroup text, hostname text, algorithm text, - score real, end_window int, noramlization_id text, - PRIMARY KEY (hostgroup, hostname, algorithm, - end_window, noramlization_id))''') - - # add row by row - num_rows = len(df) - for i in range(num_rows): - # Try inserting the row - row = df.iloc[i] - c.execute('''INSERT OR IGNORE INTO scores - VALUES (?, ?, ?, ?, ?, ?)''', row) - - conn_score.commit() + conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=120) + + modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores + (hostgroup text, hostname text, algorithm text, + score real, end_window int, noramlization_id text, + PRIMARY KEY (hostgroup, hostname, algorithm, + end_window, noramlization_id))''') + + # add row by row + num_rows = len(df) + for i in range(num_rows): + # Try inserting the row + row = df.iloc[i] + modify_db(conn_score, '''INSERT OR IGNORE INTO scores + VALUES (?, ?, ?, ?, ?, ?)''', row) + conn_score.close() diff --git a/adcern/cmd/data_mining.py b/adcern/cmd/data_mining.py index 5b1e57f7..dfc8c56d 100755 --- a/adcern/cmd/data_mining.py +++ b/adcern/cmd/data_mining.py @@ -22,6 +22,7 @@ from hashlib import blake2b os.environ['PYSPARK_PYTHON'] = sys.executable from etl.spark_etl.utils import read_json_config # noqa from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end # noqa +from adcern.sqlite3_backend import modify_db # TRADITIONAL # from pyod.models.iforest import IForest # noqa @@ -227,27 +228,23 @@ def save_scores_local_sqlite(analyser, # connect to the db conn_score = \ sqlite3.connect(score_folder - + '/scores_' + algorithm_name + '.db', timeout=30) - with conn_score: - c = conn_score.cursor() - # ensure the table is there - c.execute('''CREATE TABLE IF NOT EXISTS scores - (hostgroup text, hostname text, algorithm text, - score real, end_window int, noramlization_id text, - PRIMARY KEY (hostgroup, hostname, algorithm, end_window, - noramlization_id))''') - conn_score.commit() - - # add row by row - num_rows = len(df) - for i in range(num_rows): - # Try inserting the row - row = df.iloc[i] - c.execute('''INSERT OR IGNORE INTO scores - VALUES (?, ?, ?, ?, ?, ?)''', row) - conn_score.commit() - - conn_score.commit() + + '/scores_' + algorithm_name + '.db', timeout=120) + + # ensure the table is there + modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores + (hostgroup text, hostname text, algorithm text, + score real, end_window int, noramlization_id text, + PRIMARY KEY (hostgroup, hostname, algorithm, end_window, + noramlization_id))''') + + # add row by row + num_rows = len(df) + for i in range(num_rows): + # Try inserting the row + row = df.iloc[i] + modify_db(conn_score, '''INSERT OR IGNORE INTO scores + VALUES (?, ?, ?, ?, ?, ?)''', row) + conn_score.close() @@ -641,7 +638,7 @@ def analysis(module_name, class_name, alias_name, hyperparameters, with open(file_path_config_train) as json_file: data_dict_train = json.load(json_file) # connect to the db - conn = sqlite3.connect(folder_training_time + '/time.db') + conn = sqlite3.connect(folder_training_time + '/time.db', timeout = 120) c = conn.cursor() # ensure the table is there c.execute('''CREATE TABLE IF NOT EXISTS time diff --git a/adcern/cmd/elaborate_scores.py b/adcern/cmd/elaborate_scores.py index 3b62eb04..1941fae5 100755 --- a/adcern/cmd/elaborate_scores.py +++ b/adcern/cmd/elaborate_scores.py @@ -17,6 +17,7 @@ import sklearn.metrics from miscellanea.experiment_comparison import plot_roc # noqa from sklearn.metrics import precision_recall_curve # noqa import sklearn +from adcern.sqlite3_backend import modify_db # example of start and end of week @@ -357,16 +358,14 @@ def score_benchmark(folder_scores, hostgroup, aucs_weeks = [] # connect to the db conn_score = sqlite3.connect(labels_folder + '/week_metrics.db', - timeout=30) - with conn_score: - c = conn_score.cursor() - # ensure the table is there - c.execute('''CREATE TABLE IF NOT EXISTS auc - (hostgroup text, algorithm text, family text, - auc_score real, week_index int, end_week int, - PRIMARY KEY - (hostgroup, algorithm, end_week))''') - conn_score.commit() + timeout=120) + # ensure the table is there + modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS auc + (hostgroup text, algorithm text, family text, + auc_score real, week_index int, end_week int, + PRIMARY KEY + (hostgroup, algorithm, end_week))''') + # FOR EVERY WEEK ------------------------------------------------------- for w in sorted(weeks_available): print("WEEK: ", w, end=" - ") @@ -385,10 +384,9 @@ def score_benchmark(folder_scores, hostgroup, # ax=dict_ax_rocs[algo_name], alpha=0.2) print("AUC: ", roc_auc) aucs_weeks.append(roc_auc) - c.execute('''INSERT OR IGNORE INTO auc + modify_db(conn_score, '''INSERT OR IGNORE INTO auc VALUES (?, ?, ?, ?, ?, ?)''', (hostgroup, algo_name, family, roc_auc, int(w), end_week)) - conn_score.commit() conn_score.close() # # CUMULATIVE QUANTITIES @@ -457,7 +455,7 @@ def score_benchmark(folder_scores, hostgroup, def visualize_auc(hostgroup, input_folder, output_folder): """Visualize the AUC results form the selected algo.""" # read the database of scores in a pandas DF - conn = sqlite3.connect(input_folder + "/week_metrics.db") + conn = sqlite3.connect(input_folder + "/week_metrics.db", timeout = 120) df_week_auc = pd.read_sql_query( "SELECT * FROM auc WHERE hostgroup='{}'".format(hostgroup), conn) conn.close() diff --git a/adcern/sqlite3_backend.py b/adcern/sqlite3_backend.py new file mode 100644 index 00000000..a18ae1e7 --- /dev/null +++ b/adcern/sqlite3_backend.py @@ -0,0 +1,36 @@ +import sqlite3 + +def modify_db(conn, + query, + *args): + """Wrapper for modifications (INSERT, UPDATE, DELETE or REPLACE) of Sqlite3 database. + + Executes given query and commits it - in case of lock it retries the commit + + Params + ------ + connection: sqlite3 connection + existing sqlite3 connection to use + query: str + query to run + args: array + additional arguments for execute query, optional + """ + + with conn.cursor() as c: + if args: + c.execute(query,args) + else: + c.execute(query) + + # retry commit - db might be locked by different process + for x in range(0, 10): + try: + conn.commit() + except: + print("Commit to sqlite unsuccessful, retrying....") + time.sleep(1) + pass + finally: + break + \ No newline at end of file -- GitLab