diff --git a/property-pull.py b/property_pull.py similarity index 92% rename from property-pull.py rename to property_pull.py index 9c37c93a432fb4bc3d14e0c3fc79b5f21ade2aac..9c995331eb958fc2ae66500722c58d5931ec987f 100644 --- a/property-pull.py +++ b/property_pull.py @@ -31,14 +31,16 @@ mydb = mysql.connector.connect( data_id = -1 # Sparks Variable -conf = '' +conf = SparkConf() +conf.setMaster('yarn') +conf.setAppName('property_pull.py') -sc = '' -sqlContext = '' -spark = '' +sc = SparkContext(conf=conf) +sql_context = SQLContext(sc) +spark = SparkSession(sc) # sumUDF -sums_cols = udf(lambda arr: 0 if arr == [] else __builtins__.sum(arr), IntegerType()) +sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType()) ''' #TODO - @@ -85,7 +87,7 @@ def uncompress_pandas(_data_array, _index_array, _size, _nrows): # noinspection SqlResolve -def pull_histogram_pandas(_start_time, _end_time, _device): +def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time): _property = 'LoggingHistogram' data = {} @@ -96,7 +98,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device): .entity() .keyValue("device", _device) .keyValue("property", _property) - .buildDataset(), sqlContext) + .buildDataset(), sql_context) # data['rows'] = df.count() df.registerTempTable("pulled_data") @@ -108,7 +110,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device): array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ .toPandas() - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # Process lg array data array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ @@ -117,11 +119,10 @@ def pull_histogram_pandas(_start_time, _end_time, _device): array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ .toPandas() - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # Datas # data['rows'] = array_dataset_hg.shape[0] - print(data['rows']) data['startTime'] = _start_time data['endTime'] = _end_time @@ -134,6 +135,10 @@ def pull_histogram_pandas(_start_time, _end_time, _device): if len(data['threshold_lg']) > 0: data['threshold_lg'] = data['threshold_lg'][0] + data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() + if len(data['nsamples']) > 0: + data['nsamples'] = data['nsamples'][0] + # Timestamp # time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ .toPandas() @@ -160,7 +165,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device): # noinspection SqlResolve -def pull_integral_pandas(_start_time, _end_time, _device): +def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time): _property = "LoggingIntegral" data = {} @@ -171,7 +176,7 @@ def pull_integral_pandas(_start_time, _end_time, _device): .entity() .keyValue("device", _device) .keyValue("property", _property) - .buildDataset(), sqlContext) + .buildDataset(), sql_context) df.registerTempTable("pulled_data") @@ -179,13 +184,13 @@ def pull_integral_pandas(_start_time, _end_time, _device): array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ .toPandas() - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # Process lg Data # array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ .toPandas() - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # DATA # data['rows'] = array_dataset_hg.shape[0] @@ -242,7 +247,7 @@ def pull_integral_pandas(_start_time, _end_time, _device): # noinspection SqlResolve -def pull_raw_dist_pandas(_start_time, _end_time, _device): +def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): _property = "LoggingRawDist" data = {} @@ -253,7 +258,7 @@ def pull_raw_dist_pandas(_start_time, _end_time, _device): .entity() .keyValue("device", _device) .keyValue("property", _property) - .buildDataset(), sqlContext) + .buildDataset(), sql_context) # data['rows'] = df.count() df.registerTempTable("pulled_data") @@ -313,7 +318,7 @@ def pull_raw_dist_pandas(_start_time, _end_time, _device): # noinspection SqlResolve -def pull_integral_dist_pandas(_start_time, _end_time, _device): +def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): _property = "LoggingIntegralDist" data = {} @@ -324,7 +329,7 @@ def pull_integral_dist_pandas(_start_time, _end_time, _device): .entity() .keyValue("device", _device) .keyValue("property", _property) - .buildDataset(), sqlContext) + .buildDataset(), sql_context) # data['rows'] = df.count() df.registerTempTable("pulled_data") @@ -388,7 +393,7 @@ def pull_integral_dist_pandas(_start_time, _end_time, _device): # noinspection SqlResolve -def pull_turnloss_pandas(_start_time, _end_time, _device): +def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time): _property = "LoggingTurnLoss" data = {} @@ -399,7 +404,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device): .entity() .keyValue("device", _device) .keyValue("property", _property) - .buildDataset(), sqlContext) + .buildDataset(), sql_context) # data['rows'] = df.count() df.registerTempTable("pulled_data") @@ -411,7 +416,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device): array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ .toPandas() - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # Process lg Data # array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ @@ -420,7 +425,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device): array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ .toPandas() - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x)) + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) # DATA # data['rows'] = array_dataset_hg.shape[0] @@ -883,9 +888,44 @@ def pull_turnloss(_start_time, _end_time, _device): write_file(_device, _start_time, _end_time, _property, data) -if __name__ == '__main__': +def property_pull(_fill_name, _mode, _device, _start, _end): + gcursor = mydb.cursor() + + gcursor.execute('SELECT id FROM device where name=%s limit 1', (_device,)) + device_id = gcursor.fetchone()[0] + + request = "select count(*) from fill where name=" + str(_fill_name) + " and device_id=" + str(device_id) +\ + " and mode='" + _mode + "' and start='" + _start + "' and end='" + _end + "';" + gcursor.execute(request) + + already_done = gcursor.fetchone()[0] + if already_done != 0: + print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled') + return -1 + + print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + global data_id + + gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) " + "VALUES ( '{}', '{}', '{}', '{}', '{}');") + data_id = gcursor.lastrowid + # print(data_id) + + gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)' + 'VALUES (%s, %s, %s, %s, %s, %s )', + (_fill_name, device_id, _mode, data_id, _start, _end,)) + + pull_histogram_pandas(_fill_name, _mode, _device, _start, _end) + pull_integral_pandas(_fill_name, _mode, _device, _start, _end) + pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end) + pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end) + pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end) + + mydb.commit() + print('Pulled done, data pushed on DB') - _pandas = True + +if __name__ == '__main__': _fill_name = "7340" _mode = "STABLE" @@ -910,10 +950,10 @@ if __name__ == '__main__': " format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'") sys.exit(0) elif len(sys.argv) < 6: - print("usage: python property-pull.py <fillName> <device> <mode> <start> <end> [-f] \n" + print("usage: python property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" "where : \n" " -f also generate .dat files \n" - "type 'python property-pull.py -h' for more infos") + "type 'python property_pull.py -h' for more infos") sys.exit(1) else: _fill_name = sys.argv[1] @@ -924,46 +964,4 @@ if __name__ == '__main__': if len(sys.argv) > 6: _files = sys.argv[6] == "-f" - print(_files) - - print('Pulling data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) - - gcursor = mydb.cursor() - - gcursor.execute('SELECT id FROM device where name=%s', (_device,)) - device_id = gcursor.fetchone()[0] - # print(device_id) - - gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) " - "VALUES ( '{}', '{}', '{}', '{}', '{}');") - data_id = gcursor.lastrowid - # print(data_id) - - gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)' - 'VALUES (%s, %s, %s, %s, %s, %s )', - (_fill_name, device_id, _mode, data_id, _start, _end,)) - - conf = SparkConf() - conf.setMaster('yarn') - conf.setAppName('property-pull.py') - conf.set('driver.memory', '20g') - - sc = SparkContext(conf=conf) - sqlContext = SQLContext(sc) - spark = SparkSession(sc) - - if _pandas: - pull_histogram_pandas(_start, _end, _device) - pull_integral_pandas(_start, _end, _device) - pull_raw_dist_pandas(_start, _end, _device) - pull_integral_dist_pandas(_start, _end, _device) - pull_turnloss_pandas(_start, _end, _device) - else: - pull_histogram(_start, _end, _device) - pull_integral(_start, _end, _device) - pull_raw_dist(_start, _end, _device) - pull_integral_dist(_start, _end, _device) - # pull_turnloss(_start, _end, _device) - - mydb.commit() - print('Pulled done, data pushed on DB') + property_pull(_fill_name, _mode, _device, _start, _end)