Commit 60e255fd authored by Michal Maciejewski's avatar Michal Maciejewski
Browse files

Refactored QHDA analysis notebook

parent 7bf769a0
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
from scipy.optimize import leastsq
import numpy as np
import pandas as pd
from scipy import signal as s
from lhcsmapi.dbsignal.SignalAnalysis import SignalAnalysis
def tau_charge(df):
exponent = 1
slice_squared_df = df.pow(exponent)
return ExpSignalAnalysis.calculate_characteristic_time_in_seconds_with_integral(slice_squared_df, exponent)
def tau_energy(df):
exponent = 2
slice_squared_df = df.pow(exponent)
return ExpSignalAnalysis.calculate_characteristic_time_in_seconds_with_integral(slice_squared_df, exponent)
def tau_lin_reg(df):
return calculate_time_constant_from_log(df)
def calculate_time_constant_from_log(df):
right_exp = lambda p, x: -p[1] * x + p[0]
err = lambda p, x, y: (right_exp(p, x) - y)
time = df.index
rate = np.log(df.values)
v0 = [0., time.min()]
out = leastsq(err, v0, args=(time, rate))
v = out[0] # fit parameters out []
time_constant = 1 / v[1]
return time_constant
def tau_exp_fit(df):
return calculate_time_constant_with_optimization(df)
def calculate_time_constant_with_optimization(df):
right_exp = lambda p, x: p[0] * np.exp(-p[2] * (x - p[1]))
err = lambda p, x, y: (right_exp(p, x) - y)
time = df.index
rate = df.values
v0 = [0., 0., 0.]
out = leastsq(err, v0, args=(time, rate))
v = out[0] # fit parameters out []
time_constant = 1 / v[2]
return time_constant
class ExpSignalAnalysis(object):
__name_to_function = {"tau_charge": tau_charge,
"tau_energy": tau_energy,
"tau_lin_reg": tau_lin_reg,
"tau_exp_fit": tau_exp_fit}
@staticmethod
def calculate_char_time(series, t_start_decay, t_end_decay):
# ToDo: possible duplicate of tau_energy - to be fixed with the DSL
index_start_decay = series.index.get_loc(t_start_decay, method='nearest')
index_end_decay = series.index.get_loc(t_end_decay, method='nearest')
df_decay = series.iloc[index_start_decay:index_end_decay] - series.min()
df_decay_squared = df_decay.pow(2)
df_decay_tau = ExpSignalAnalysis.calculate_characteristic_time_in_seconds_with_integral(
df_decay_squared, exponent=2)
return df_decay_tau[0]
@staticmethod
def calculate_characteristic_time_in_seconds_with_integral(exp_decay_in_sec, exponent):
return exponent * np.trapz(exp_decay_in_sec, x=exp_decay_in_sec.index, axis=0) / (
exp_decay_in_sec.iloc[0] - exp_decay_in_sec.iloc[-1])
@staticmethod
def get_characteristic_time_of_exponential_decay(df, function_names_to_apply):
"""**returns dataframe with the time constants, calculated with the chosen methods**
Function returns a dataframe with time constant according to the input method chosen for the passed DataFrame object
Note that passed DataFrame object should just contain the decay, the functions getWindowWithDecayOnly or
getWindowWithGivenMeanAndStd can be used for filtering
:type df: DataFrame
:type function_names_to_apply: list
:return: DataFrame
- Example :
from source.signals.SignalAnalysis import SignalAnalysis
import pandas as pd
import numpy as np
x = np.arange(100)*0.001
timeConstant = 0.8
df = pd.DataFrame(data = np.exp(-x/timeConstant), index = x, columns = ["Voltage_1"])
functionNamesToApply = ["tau_charge", "tau_energy", "tau_lin_reg", "eFit"]
SignalAnalysis.getDecayTimeConstant(df, functionNamesToApply)
>>> Voltage_1
>>>tau_energy 0.8
>>>tau_energy 0.8
>>>tau_lin_reg 0.8
>>>eFit 0.8
"""
functions_to_apply = [ExpSignalAnalysis.__name_to_function[x] for x in function_names_to_apply]
if isinstance(df, pd.DataFrame) and not df.empty:
return df.aggregate(functions_to_apply)
else:
return df
@staticmethod
def calculate_change_in_characteristic_of_exponential_decay(origin_series, window=100):
"""Method returns the change in the characteristic of the exponential decay over time as a series
"""
sampling_time = (origin_series.index[1] - origin_series.index[0])
diff_series = origin_series.diff(periods=window) / sampling_time / window
tau_series = -origin_series.div(diff_series)
return tau_series
@staticmethod
def find_start_index_of_exponential_decay(exp_df):
# find start index based on current
if any(['I_HDS' in col for col in exp_df.columns]):
i_decay_start_index = ExpSignalAnalysis.get_window_with_given_mean_and_std(exp_df.filter(regex='I_HDS_1'))
else:
decay_start_index = 0
if decay_start_index != 0:
return (decay_start_index, 'current')
# find start index based on voltage correlation to an ideal decay
# find start index based on voltage decrease
decay_start_index = ExpSignalAnalysis.get_window_with_deviation_from_initial_value(exp_df.filter(regex='U_HDS_1'))
if decay_start_index != 0:
return (decay_start_index, 'voltage')
# if not found, then take the full dataframe
return (0, 'failed')
@staticmethod
def get_df_with_decay_only(exp_df):
"""" Method looks for beginning of the decay and returns DF with only the decay
"""
decay_start_index, source = ExpSignalAnalysis.find_start_index_of_exponential_decay(exp_df)
return exp_df.iloc[(decay_start_index + 1):len(exp_df)]
@staticmethod
def has_window_mean_std(x, mean=5, std=0.1):
return (x.mean() >= mean) and (x.std() <= std)
# mean=50 => mean=5 to be able to analyse low charge discharges, by Zinur
@staticmethod
def get_window_with_given_mean_and_std(origin_series, index_increment=20):
"""Method returns a series within a given mean and standard deviation
"""
origin_series_short = origin_series[origin_series.index < int(len(origin_series) / 10)]
t_start = origin_series_short.rolling(index_increment).apply(ExpSignalAnalysis.has_window_mean_std, raw=True). \
shift(-index_increment + 1)
t_start = t_start.dropna()
if len(t_start[t_start == 1]) == 0:
return 0
else:
return origin_series_short.index.get_loc(t_start[t_start == 1].index[0])
@staticmethod
def get_window_with_deviation_from_initial_value(origin_series, initial_value=0, allowed_deviation=0.998):
"""Method applies low pass filter to series
"""
if initial_value == 0:
initial_value = np.mean(origin_series.iloc[0:19].rolling(window=3).median()).values[0]
# get df with initial values
filtered_series = SignalAnalysis.apply_frequency_filter(origin_series)
initial_series = filtered_series[filtered_series.values > initial_value * allowed_deviation]
if len(initial_series) > 0:
return np.flatnonzero(origin_series.index == initial_series.index[-1])[0]
else:
return 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment