Skip to content
Snippets Groups Projects
property_pull.py 31.4 KiB
Newer Older
#!spark-submit --master yarn
qcodelup's avatar
qcodelup committed
import json
import sys
import requests
from requests.exceptions import HTTPError
qcodelup's avatar
qcodelup committed


from cern.nxcals.pyquery.builders import *
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql.utils import AnalysisException
qcodelup's avatar
qcodelup committed

from multiprocessing import Process

qcodelup's avatar
qcodelup committed
import numpy as np
qcodelup's avatar
qcodelup committed

API_HOST = 'http://test-nxdblm-api2.web.cern.ch/'

_dest_dir = './pulledData'
qcodelup's avatar
qcodelup committed

Quentin Codelupi's avatar
Quentin Codelupi committed

qcodelup's avatar
qcodelup committed

# Sparks Variable
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('cern_nxcals_dblm_property_pull.py')
qcodelup's avatar
qcodelup committed

sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
spark = SparkSession(sc)
qcodelup's avatar
qcodelup committed

# sumUDF
sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType())
qcodelup's avatar
qcodelup committed

''' 
#TODO - 
Quentin Codelupi's avatar
Quentin Codelupi committed
# 	Reduce ( UDAF ? )
# 	Scala UDF ? (working with python's lambda for now ) 
qcodelup's avatar
qcodelup committed
'''


def write_file(_device, _start_time, _end_time, _property, data, _files_write):
    if _files_write:
        # Writing output #
        with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
                  + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile:
            json.dump(data, outfile)

        print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
              + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
''' uncompress_pandas
Apply a decompression process accordingly to NXCALS compression documentation :
zeros a removed from data_array index of non zeros data are stocked into an another array 
Input : 
    _data_array : datas get from NXCALS for a compressed type of data
    _index_array : indexes for non-zero data
    _size : size of the final array
    _nrows : number of rows given by NXCALS
'''


def uncompress_pandas(_data_array, _index_array, _size, _nrows):
    print('uncompress start')
    result = np.zeros(_size, dtype=np.int64)
    for i in range(_nrows):
        tmpresult = np.zeros(_size, dtype=np.int)
        np.put(tmpresult, _index_array[i][0], _data_array[i][0])
        result = np.add(result, tmpresult)
    print("uncompress finished")
    return result.tolist()


''' check_db
Check some databases infos :
    - device & mode are registered into the database.
    - the measurement are already been done
    - if already done, check the data completion
    - if not already done, 
        - check if fill & fill_mode already exist (create if not) 
        - init & create a new measure
'''


def check_db(_fill_name, _mode, _device, _start, _end):
    try:
        response = requests.get(API_HOST + 'mode',
                                params={
                                    'name': _mode
                                })
        if response.status_code == 204:
            print('Mode ' + _mode + ' unknown ')
            return False, -1

        response = requests.get(API_HOST + 'device',
                                params={
                                    'name': _device
                                })
        if response.status_code == 204:
            print('Device ' + _device + ' unknown')
            return False, -1
        device_id = response.json()['id']

        response = requests.get(API_HOST + 'measure',
                                params={
                                    'device_id': device_id,
                                    'mode_name': _mode,
                                    'fill_name': _fill_name,
                                    'start': _start
                                })

        if response.status_code != 204:
            data_id = response.json()['data_id']
            response = requests.get(API_HOST + 'data_complete', params={id: data_id})
            if response.status_code == 200:
                if response.json()['complete']:
                    print('data are already pulled for fill ' + str(_fill_name) +
                          ' for device ' + _device +
                          ' mode ' + _mode +
                          ' from ' + _start + ' to ' + _end)
                    return False, -1
                else:
                    return True, data_id
            else:
                print('error during dataCheck')
                return False, -1

        else:
            response = requests.post(API_HOST + 'new_data')
            data_id = response.json()['id']

            response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name})
            if response.status_code == 204:
                requests.post(API_HOST + 'new_fill',
                              params={
                                  'name': _fill_name,
                                  'start': _start,
                                  'end': _end
                              })

            response = requests.get(API_HOST + 'fill_mode',
                                    params={
                                        'mode_name': _mode,
                                        'fill_name': _fill_name,
                                        'start': _start,
                                        'end': _end,
                                    })
            if response.status_code == 204:
                response = requests.post(API_HOST + 'new_fill_mode',
                                         params={
                                             'fill_name': _fill_name,
                                             'mode_name': _mode,
                                             'start': _start,
                                             'end': _end,
                                         })
                fill_mode_id = response.json()['id']
            else:
                fill_mode_id = response.json()['id']

            requests.post(API_HOST + 'new_measure',
                          params={
                              'fill_mode_id': fill_mode_id,
                              'data_id': data_id,
                              'device_id': device_id
                          })
            return True, data_id
    except HTTPError as err:
        print('HTTPError' + str(err))
        return False, -1


def pull_histogram_pandas(_fill_name, _mode,
                          _device,
                          _start_time,
                          _end_time,
                          _files_write=False,
                          _db_write=True,
                          _data_id=None):
    if (_data_id is None or _data_id == -1) and _db_write:
        ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
        if not ok or _data_id is None or _data_id == -1:
            return -1

    data = {}

    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
                   .startTime(_start_time)
                   .endTime(_end_time)
                   .entity()
                   .keyValue("device", _device)
                   .keyValue("property", _property)
    try:
        array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \
            .toPandas()
        array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
        data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist()
    except AnalysisException:
        print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
        array_dataset_hg = None
        data['histogram_hg_intime'] = []
    try:
        array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \
            .toPandas()
    except AnalysisException:
        print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
        array_index_hg = None
    try:
        array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \
            .toPandas()

        array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
        data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist()
    except AnalysisException:
        print('histogram_lg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
        array_dataset_lg = None
        data['histogram_lg_intime'] = []

    try:
        array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \
            .toPandas()
    except AnalysisException:
        print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
        array_index_lg = None

    # Metadata
    if array_dataset_hg is not None:
        data['rows'] = array_dataset_hg.shape[0]
    elif array_dataset_lg is not None:
        data['rows'] = array_dataset_lg.shape[0]
    else:
        data['rows'] = 0
    data['startTime'] = _start_time
    data['endTime'] = _end_time

    try:
        data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['threshold_hg']) > 0:
            data['threshold_hg'] = data['threshold_hg'][0]
    except AnalysisException:
        data['threshold_hg'] = 0

    try:
        data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['threshold_lg']) > 0:
            data['threshold_lg'] = data['threshold_lg'][0]
    except AnalysisException:
        data['threshold_lg'] = 0

    try:
        data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist()
        if len(data['nsamples']) > 0:
            data['nsamples'] = data['nsamples'][0]
    except AnalysisException:
        data['nsamples'] = 0
    try:
        time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
            .toPandas()
        data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
    except AnalysisException:
        data['timestamps'] = []
    if array_dataset_hg is None or array_index_hg is None:
        data['histogram_hg'] = []
    else:
        data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values,
                                                 array_index_hg.values,
                                                 57802, data['rows'] - 1)
    if array_dataset_lg is None or array_index_lg is None:
        data['histogram_lg'] = []
    else:
        data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values,
                                                 array_index_lg.values,
                                                 57802, data['rows'] - 1)
        db_put_response = requests.put(API_HOST + 'histogram',
                                       params={
                                           'id': _data_id
                                       },
                                       json=data)
    write_file(_device, _start_time, _end_time, _property, data, _files_write)

    return data
def pull_integral_pandas(_fill_name,
                         _mode,
                         _device,
                         _start_time,
                         _end_time,
                         _files_write=False,
                         _db_write=True,
                         _data_id=None):
    if (_data_id is None or _data_id == -1) and _db_write:
        ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
        if not ok or _data_id is None or _data_id == -1:
            return -1

    data = {}

    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
                   .startTime(_start_time)
                   .endTime(_end_time)
                   .entity()
                   .keyValue("device", _device)
                   .keyValue("property", _property)
    try:
        array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \
            .toPandas()

        array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
        data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist()

        integral_hg = np.zeros(3564, dtype=np.int)
        for T in array_dataset_hg.values:
            if len(T[0]) == 3564:
                integral_hg = np.add(integral_hg, T[0])
        data['integral_hg'] = integral_hg.tolist()

    except AnalysisException:
        print('integral_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
        array_dataset_hg = None
        data['integral_hg_intime'] = []
        data['integral_hg'] = []

    try:
        array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \
            .toPandas()

        array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))

        data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist()

        integral_lg = np.zeros(3564, dtype=np.int)
        for T in array_dataset_lg.values:
            if len(T[0]) == 3564:
                integral_lg = np.add(integral_lg, T[0])
        data['integral_lg'] = integral_lg.tolist()
    except AnalysisException:
        array_dataset_lg = None
        data['integral_lg_intime'] = []
        data['integral_lg'] = []

    # Metadata
    if array_dataset_hg is not None:
        data['rows'] = array_dataset_hg.shape[0]
    elif array_dataset_lg is not None:
        data['rows'] = array_dataset_lg.shape[0]
    else:
        data['rows'] = 0

    data['startTime'] = _start_time
    data['endTime'] = _end_time

    try:
        data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthr_hg']) > 0:
            data['zsthr_hg'] = data['zsthr_hg'][0]
    except AnalysisException:
        data['zsthr_hg'] = 0

    try:
        data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthrave_hg']) > 0:
            data['zsthrave_hg'] = data['zsthrave_hg'][0]
    except AnalysisException:
        data['zsthrave_hg'] = 0

    try:
        data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthr_lg']) > 0:
            data['zsthr_lg'] = data['zsthr_lg'][0]
    except AnalysisException:
        data['zsthr_lg'] = 0

    try:
        data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthrave_lg']) > 0:
            data['zsthrave_lg'] = data['zsthrave_lg'][0]
    except AnalysisException:
        data['zsthrave_lg'] = -1

    try:
        data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
        if len(data['baselinesub']) > 0:
            data['baselinesub'] = data['baselinesub'][0]
    except AnalysisException:
        data['baselinesub'] = 0
    try:
        time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
            .toPandas()
        data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
    except AnalysisException:
        data['timestamps'] = []
        db_put_response = requests.put(API_HOST + 'integral',
                                       params={
                                           'id': _data_id
                                       },
                                       json=data)
    write_file(_device, _start_time, _end_time, _property, data, _files_write)

    return data
def pull_raw_dist_pandas(_fill_name,
                         _mode,
                         _device,
                         _start_time,
                         _end_time,
                         _files_write=False,
                         _db_write=True,
                         _data_id=None):
    if (_data_id is None or _data_id == -1) and _db_write:
        ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
        if not ok or _data_id is None or _data_id == -1:
            return -1

    data = {}

    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
                   .startTime(_start_time)
                   .endTime(_end_time)
                   .entity()
                   .keyValue("device", _device)
                   .keyValue("property", _property)
    df.registerTempTable("pulled_data")
    try:
        array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
            .toPandas()

        result_distrib_hg = np.zeros(2048, dtype=np.int)
        for R in array_dataset_hg.values:
            if len(R[0]) == 2048:
                result_distrib_hg = np.add(result_distrib_hg, R[0])
        data['distribution_hg'] = result_distrib_hg.tolist()
    except AnalysisException:
        array_dataset_hg = None
        data['distribution_hg'] = []
    try:
        array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
            .toPandas()
        result_distrib_lg = np.zeros(2048, dtype=np.int)
        for R in array_dataset_lg.values:
            if len(R[0]) == 2048:
                result_distrib_lg = np.add(result_distrib_lg, R[0])
        data['distribution_lg'] = result_distrib_lg.tolist()
    except AnalysisException:
        array_dataset_lg = None
        data['distribution_lg'] = []

    # Metadata
    if array_dataset_hg is not None:
        data['rows'] = array_dataset_hg.shape[0]
    elif array_dataset_lg is not None:
        data['rows'] = array_dataset_lg.shape[0]
    else:
        data['rows'] = 0

    data['startTime'] = _start_time
    data['endTime'] = _end_time

    try:
        data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['lsbcut_hg']) > 0:
            data['lsbcut_hg'] = data['lsbcut_hg'][0]
    except AnalysisException:
        data['lsbcut_hg'] = 0

    try:
        data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['offset_hg']) > 0:
            data['offset_hg'] = data['offset_hg'][0]
    except AnalysisException:
        data['offset_hg'] = 0

    try:
        data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['lsbcut_lg']) > 0:
            data['lsbcut_lg'] = data['lsbcut_lg'][0]
    except AnalysisException:
        data['lsbcut_lg'] = 0

    try:
        data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['offset_lg']) > 0:
            data['offset_lg'] = data['offset_lg'][0]
    except AnalysisException:
        data['offset_lg'] = 0
    try:
        time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
            .toPandas()
        data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
    except AnalysisException:
        data['timestamps'] = []
        db_put_response = requests.put(API_HOST + 'raw_dist',
                                       params={
                                           'id': _data_id
                                       },
                                       json=data)
    write_file(_device, _start_time, _end_time, _property, data, _files_write)

    return data
def pull_integral_dist_pandas(_fill_name,
                              _mode,
                              _device,
                              _start_time,
                              _end_time,
                              _files_write=False,
                              _db_write=True,
                              _data_id=None):
    if (_data_id is None or _data_id == -1) and _db_write:
        ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
        if not ok or _data_id is None or _data_id == -1:
            return -1

    data = {}

    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
                   .startTime(_start_time)
                   .endTime(_end_time)
                   .entity()
                   .keyValue("device", _device)
                   .keyValue("property", _property)
    df.registerTempTable("pulled_data")
    try:
        array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
            .toPandas()

        result_distrib_hg = np.zeros(2048, dtype=np.int)
        for R in array_dataset_hg.values:
            if len(R[0]) == 2048:
                result_distrib_hg = np.add(result_distrib_hg, R[0])
        data['distribution_hg'] = result_distrib_hg.tolist()
    except AnalysisException:
        data['distribution_hg'] = []
        array_dataset_hg = None
    try:
        array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
            .toPandas()

        result_distrib_lg = np.zeros(2048, dtype=np.int)
        for R in array_dataset_lg.values:
            if len(R[0]) == 2048:
                result_distrib_lg = np.add(result_distrib_lg, R[0])
        data['distribution_lg'] = result_distrib_lg.tolist()
    except AnalysisException:
        array_dataset_lg = None
        data['distribution_lg'] = []

    # Metadata #
    if array_dataset_hg is not None:
        data['rows'] = array_dataset_hg.shape[0]
    elif array_dataset_lg is not None:
        data['rows'] = array_dataset_lg.shape[0]
    else:
        data['rows'] = 0

    data['startTime'] = _start_time
    data['endTime'] = _end_time

    try:
        data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['lsbcut_hg']) > 0:
            data['lsbcut_hg'] = data['lsbcut_hg'][0]
    except AnalysisException:
        data['lsbcut_hg'] = 0

    try:
        data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['offset_hg']) > 0:
            data['offset_hg'] = data['offset_hg'][0]
    except AnalysisException:
        data['offset_hg'] = 0

    try:
        data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['lsbcut_lg']) > 0:
            data['lsbcut_lg'] = data['lsbcut_lg'][0]
    except AnalysisException:
        data['lsbcut_lg'] = 0

    try:
        data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['offset_lg']) > 0:
            data['offset_lg'] = data['offset_lg'][0]
    except AnalysisException:
        data['offset_lg'] = 0

    try:
        data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
        if len(data['nturns']) > 0:
            data['nturns'] = data['nturns'][0]
    except AnalysisException:
        data['nturns'] = 0
    try:
        time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
            .toPandas()
        data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
    except AnalysisException:
        data['timestamps'] = []
        requests.put(API_HOST + 'integral_dist',
                     params={
                         'id': _data_id
                     },
                     json=data)
    write_file(_device, _start_time, _end_time, _property, data, _files_write)

    return data
def pull_turnloss_pandas(_fill_name,
                         _mode,
                         _device,
                         _start_time,
                         _end_time,
                         _files_write=False,
                         _db_write=True,
                         _data_id=None):
    if (_data_id is None or _data_id == -1) and _db_write:
        ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
        if not ok or _data_id is None or _data_id == -1:
            return -1

    data = {}

    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
                   .startTime(_start_time)
                   .endTime(_end_time)
                   .entity()
                   .keyValue("device", _device)
                   .keyValue("property", _property)
    df.registerTempTable("pulled_data")
    try:
        array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \
            .toPandas()
        array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
        data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist()
    except AnalysisException:
        array_dataset_hg = None
        data['turnloss_hg_intime'] = []
    try:
        array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \
            .toPandas()
    except AnalysisException:
        array_index_hg = None
    # Process lg Data #
    try:
        array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \
            .toPandas()

        array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
        data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist()

    except AnalysisException:
        array_dataset_lg = None
        data['turnloss_lg_intime'] = []

    try:
        array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \
            .toPandas()
    except AnalysisException:
        array_index_lg = None

    # Metadata #
    if array_dataset_hg is not None:
        data['rows'] = array_dataset_hg.shape[0]
    elif array_dataset_lg is not None:
        data['rows'] = array_dataset_lg.shape[0]
    else:
        data['rows'] = 0

    data['startTime'] = _start_time
    data['endTime'] = _end_time

    try:
        data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthr_hg']) > 0:
            data['zsthr_hg'] = data['zsthr_hg'][0]
    except AnalysisException:
        data['zsthr_hg'] = 0

    try:
        data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
        if len(data['zsthr_lg']) > 0:
            data['zsthr_lg'] = data['zsthr_lg'][0]
    except AnalysisException:
        data['zsthr_lg'] = 0

    try:
        data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
        if len(data['baselinesub']) > 0:
            data['baselinesub'] = data['baselinesub'][0]
    except AnalysisException:
        data['baselinesub'] = 0

    try:
        data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
        if len(data['nturns']) > 0:
            data['nturns'] = data['nturns'][0]
    except AnalysisException:
        data['nturns'] = 0
    try:
        time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
            .toPandas()
        data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
    except AnalysisException:
        data['timestamps'] = []
    if array_dataset_hg is None or array_index_hg is None:
        data['turnloss_hg'] = []
    else:
        data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values,
                                                array_index_hg.values,
                                                data['nturns'], data['rows'])

    if array_dataset_lg is None or array_index_lg is None:
        data['turnloss_lg'] = []
    else:
        data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values,
                                                array_index_lg.values,
                                                data['nturns'], data['rows'])
        requests.put(API_HOST + 'turnloss',
                     params={
                         'id': _data_id
                     },
                     json=data)
qcodelup's avatar
qcodelup committed

    write_file(_device, _start_time, _end_time, _property, data, _files_write)
qcodelup's avatar
qcodelup committed

qcodelup's avatar
qcodelup committed

def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True):
    ok, data_id = check_db(_fill_name, _mode, _device, _start, _end)
    if not ok or data_id is None or data_id == -1:
    print('Pulling data for fill '
          + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)

    pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
    pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
    pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
    pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
    pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)

    print('Pulled done, data pushed on DB for fill '
          + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
qcodelup's avatar
qcodelup committed

    _fill_name = "7340"
qcodelup's avatar
qcodelup committed
    _mode = "STABLE"
    _start = "2018-10-22 21:18:00.000"
    _end = "2018-10-23 11:18:00.000"
    _device = "BLMED.06L7.B1B30_TCPA.A6L7.B1"
qcodelup's avatar
qcodelup committed

    if len(sys.argv) > 1 and sys.argv[1] == '-h':
        print("     --   HELP    -- \n"
              "     --   TODO    -- \n"
              "-args- \n"
              "     <fillName> : the name of the fill you are pulling, a fill is a complete experience. "
              "In one fill there is multiples device, modes, time measurements ... "
              "So you'll need to do multiples pulling for the same fill \n"
              "     <device> : the name of the device you want to pull. If you want to pull multiples device at once,"
              " enter devices in quotes, separated by spaces \n"
              " ex: '<device1> <device2> <device3> ...' \n\n"
              "     <mode> : the name of the mode you want to pull. \n"
              " ex: STABLE, ALL, START, BEAMDUMP ... \n\n"
              "     <start> : the start time of the pull you want to do. \n"
              " format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'")
qcodelup's avatar
qcodelup committed
        sys.exit(0)
    elif len(sys.argv) < 6:
        print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
              "where : \n"
              "     -f  also generate .dat files \n"
              "type 'python cern_nxcals_dblm_property_pull.py -h' for more infos")
qcodelup's avatar
qcodelup committed
        sys.exit(1)
    else:
        _fill_name = sys.argv[1]
        _device = sys.argv[2]
        _mode = sys.argv[3]
        _start = sys.argv[4]
        _end = sys.argv[5]
        if len(sys.argv) > 6:
            _files = sys.argv[6] == "-f"

    property_pull(_fill_name, _mode, _device, _start, _end)