diff --git a/property_pull.py b/property_pull.py index cd29379b369d0a4426e0e6fc46c79d983a39431a..56507b1dde8b34e7941c86941494134d2b9de002 100644 --- a/property_pull.py +++ b/property_pull.py @@ -1,3 +1,5 @@ +#!spark-submit --master yarn + import json import sys @@ -11,6 +13,7 @@ from pyspark.sql import * from pyspark.accumulators import AccumulatorParam from pyspark.sql.functions import udf, col from pyspark.sql.types import IntegerType +from pyspark.sql.utils import AnalysisException from multiprocessing import Process @@ -86,62 +89,98 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _f .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") - # Process hg array data - array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \ + .toPandas() - array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ - .toPandas() + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['histogram_hg_intime'] = [] - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + try: + array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_hg = None # Process lg array data - array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ - .toPandas() - - array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ - .toPandas() - - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) - - # Datas # - data['rows'] = array_dataset_hg.shape[0] + try: + array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist() + except AnalysisException: + print('histogram_lg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_lg = None + data['histogram_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_lg = None + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 data['startTime'] = _start_time data['endTime'] = _end_time - data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['threshold_hg']) > 0: - data['threshold_hg'] = data['threshold_hg'][0] - - data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['threshold_lg']) > 0: - data['threshold_lg'] = data['threshold_lg'][0] - - data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() - if len(data['nsamples']) > 0: - data['nsamples'] = data['nsamples'][0] + try: + data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_hg']) > 0: + data['threshold_hg'] = data['threshold_hg'][0] + except AnalysisException: + data['threshold_hg'] = 0 + + try: + data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_lg']) > 0: + data['threshold_lg'] = data['threshold_lg'][0] + except AnalysisException: + data['threshold_lg'] = 0 + + try: + data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() + if len(data['nsamples']) > 0: + data['nsamples'] = data['nsamples'][0] + except AnalysisException: + data['nsamples'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Intime Data # - data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Histogram data # - data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values, - array_index_hg.values, - 57802, data['rows'] - 1) - - data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values, - array_index_lg.values, - 57802, data['rows'] - 1) + if array_dataset_hg is None or array_index_hg is None: + data['histogram_hg'] = [] + else: + data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + 57802, data['rows'] - 1) + if array_dataset_lg is None or array_index_lg is None: + data['histogram_lg'] = [] + else: + data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + 57802, data['rows'] - 1) # Writing to db if _db_write: @@ -169,64 +208,96 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi df.registerTempTable("pulled_data") - # Process hg Data # - array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ - .toPandas() - - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) - - # Process lg Data # - array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ - .toPandas() - - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + try: + array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ + .toPandas() + + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist() + + integral_hg = np.zeros(3564, dtype=np.int) + for T in array_dataset_hg.values: + if len(T[0]) == 3564: + integral_hg = np.add(integral_hg, T[0]) + data['integral_hg'] = integral_hg.tolist() + + except AnalysisException: + print('integral_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['integral_hg_intime'] = [] + data['integral_hg'] = [] + + try: + array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + + data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + integral_lg = np.zeros(3564, dtype=np.int) + for T in array_dataset_lg.values: + if len(T[0]) == 3564: + integral_lg = np.add(integral_lg, T[0]) + data['integral_lg'] = integral_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['integral_lg_intime'] = [] + data['integral_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] - - data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthrave_hg']) > 0: - data['zsthrave_hg'] = data['zsthrave_hg'][0] - - data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] - - data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthrave_lg']) > 0: - data['zsthrave_lg'] = data['zsthrave_lg'][0] - - data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - # Intime Data # - data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_hg']) > 0: + data['zsthrave_hg'] = data['zsthrave_hg'][0] + except AnalysisException: + data['zsthrave_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_lg']) > 0: + data['zsthrave_lg'] = data['zsthrave_lg'][0] + except AnalysisException: + data['zsthrave_lg'] = -1 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Integral Data - integral_hg = np.zeros(3564, dtype=np.int) - for T in array_dataset_hg.values: - if len(T[0]) == 3564: - integral_hg = np.add(integral_hg, T[0]) - data['integral_hg'] = integral_hg.tolist() - - integral_lg = np.zeros(3564, dtype=np.int) - for T in array_dataset_lg.values: - if len(T[0]) == 3564: - integral_lg = np.add(integral_lg, T[0]) - data['integral_lg'] = integral_lg.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: @@ -252,55 +323,81 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + array_dataset_hg = None + data['distribution_hg'] = [] # Process lg Data # - array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - # Array Data # - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in array_dataset_hg.values: - if len(R[0]) == 2048: - result_distrib_hg = np.add(result_distrib_hg, R[0]) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in array_dataset_lg.values: - if len(R[0]) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R[0]) - data['distribution_lg'] = result_distrib_lg.tolist() + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: @@ -326,59 +423,89 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + data['distribution_hg'] = [] + array_dataset_hg = None # Process lg Data # - array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] - - # Array Data # - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in array_dataset_hg.values: - if len(R[0]) == 2048: - result_distrib_hg = np.add(result_distrib_hg, R[0]) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in array_dataset_lg.values: - if len(R[0]) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R[0]) - data['distribution_lg'] = result_distrib_lg.tolist() + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: @@ -404,65 +531,105 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \ - .toPandas() - - array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \ + .toPandas() - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist() - # Process lg Data # - array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ - .toPandas() + except AnalysisException: + array_dataset_hg = None + data['turnloss_hg_intime'] = [] - array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ - .toPandas() + try: + array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_hg = None - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + # Process lg Data # + try: + array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + except AnalysisException: + array_dataset_lg = None + data['turnloss_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_lg = None + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] - - data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] - - data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Intime Data # - data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Turnloss Data - data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values, - array_index_hg.values, - data['nturns'], data['rows']) + if array_dataset_hg is None or array_index_hg is None: + data['turnloss_hg'] = [] + else: + data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + data['nturns'], data['rows']) - data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values, - array_index_lg.values, - data['nturns'], data['rows']) + if array_dataset_lg is None or array_index_lg is None: + data['turnloss_lg'] = [] + else: + data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + data['nturns'], data['rows']) # Writing to db if _db_write: @@ -475,23 +642,25 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True): - gcursor = mydb.cursor() gcursor.execute('select count(*) from Mode where name=%s', (_mode,)) mode_exists = gcursor.fetchone()[0] if mode_exists == 0: print('Mode ' + _mode + ' unknown ') + return -1 gcursor.execute('SELECT id FROM Device where name=%s limit 1', (_device,)) device_id = gcursor.fetchone()[0] if device_id is None: print('Device ' + _device + ' unknown') + return -1 gcursor.execute("select count(*) from Measure where device_id=%s " - "and fill_mode_id in ( select id from FillMode " + "and fill_mode_id=( select id from FillMode " "where mode_id=(select id from Mode where name=%s) " - "and fill_id=(select id from Fill where name=%s))", (device_id, _mode, _fill_name)) + "and fill_id=(select id from Fill where name=%s)" + "and start=%s)", (device_id, _mode, _fill_name, _start)) already_done = gcursor.fetchone()[0] if already_done != 0: @@ -510,20 +679,32 @@ def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, if gcursor.fetchone()[0] == 0: gcursor.execute("insert into Fill(name, start, end) values (%s, %s, %s)", (_fill_name, _start, _end)) # print(data_id) - gcursor.execute("insert into FillMode(fill_id, mode_id, start, end) " - "values ((select id from Fill where name=%s),(select id from Mode where name=%s), %s, %s)", - (_fill_name, _mode, _start, _end)) - fill_mode_id = gcursor.lastrowid + gcursor.execute("select count(*) from FillMode " + "where mode_id=(select id from Mode where name=%s) " + "and fill_id=(select id from Fill where name=%s) " + "and start=%s and end=%s", + (_mode, _fill_name, _start, _end)) + if gcursor.fetchone()[0] == 0: + gcursor.execute("insert into FillMode(fill_id, mode_id, start, end) " + "values ((select id from Fill where name=%s),(select id from Mode where name=%s), %s, %s)", + (_fill_name, _mode, _start, _end)) + fill_mode_id = gcursor.lastrowid + else: + gcursor.execute("select id from FillMode where mode_id=(select id from Mode where name=%s) " + "and fill_id=(select id from Fill where name=%s) " + "and start=%s and end=%s", + (_mode, _fill_name, _start, _end)) + fill_mode_id = gcursor.fetchone()[0] gcursor.execute('INSERT INTO Measure(fill_mode_id, data_id, device_id)' 'VALUES ( %s, %s, %s )', (fill_mode_id, data_id, device_id)) pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) + pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) + pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) + pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) + pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) mydb.commit() print('Pulled done, data pushed on DB for fill ' diff --git a/routine_pull.py b/routine_pull.py index 0c3269281c6b52caa7c574fb11c35345782d20b2..54e35f0f1c0056f70b01dcce0716fe80d33c630e 100644 --- a/routine_pull.py +++ b/routine_pull.py @@ -31,7 +31,8 @@ def start_pull(_fill_infos, _devices=None): property_pull.property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _files_write=False) for iMode in _fill_infos["beamModes"]: print(iMode['mode']) @@ -70,7 +71,6 @@ def routine_pull(_start_fill=None, _end_fill=None): new_fill_number = _end_fill last_fll_pulled = _start_fill - 1 - # Select only the first 6 devices Cause: not all device works properly with NXCALS yet gcursor.execute('select name from Device') devices_request = gcursor.fetchall() devices = [] @@ -80,7 +80,7 @@ def routine_pull(_start_fill=None, _end_fill=None): gcursor.close() mydb.close() - fill_number = 7492 + fill_number = 7443 fill_infos = ldb.getLHCFillData(fill_number, False) start_pull(fill_infos, devices)