From ccd5705f503db4771115ac9538a28dcb4400e413 Mon Sep 17 00:00:00 2001 From: Simon Mazenoux <simon.mazenoux@cern.ch> Date: Fri, 6 Dec 2024 10:33:16 +0100 Subject: [PATCH] big refactor --- .gitlab-ci.yml | 1 + lhcsmqh/analyses/__init__.py | 3 +- lhcsmqh/analyses/commons.py | 488 ++++++++++-------- lhcsmqh/analyses/quench_heater_ccc.py | 264 ++-------- .../quench_heater_voltage_analysis.py | 65 +-- .../quench_heater_voltage_current_analysis.py | 140 +---- lhcsmqh/output/_common.py | 126 ++--- lhcsmqh/output/quench_heater_ccc_output.py | 30 +- .../quench_heater_voltage_current_output.py | 10 +- .../output/quench_heater_voltage_output.py | 12 +- pyproject.toml | 3 +- test/unit/test_output.py | 30 +- 12 files changed, 463 insertions(+), 709 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6a03d74..46b3e2e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -50,6 +50,7 @@ notebook_tests: stage: test image: registry.cern.ch/acc/acc-py_el9_openjdk11:pro artifacts: + when: always paths: - reports/ script: diff --git a/lhcsmqh/analyses/__init__.py b/lhcsmqh/analyses/__init__.py index fb55a46..1630177 100644 --- a/lhcsmqh/analyses/__init__.py +++ b/lhcsmqh/analyses/__init__.py @@ -1,4 +1,4 @@ -from .commons import Event, Result, VoltageCurrentEvent, VoltageCurrentResult, VoltageEvent, VoltageResult +from .commons import Event, VoltageCurrentEvent, VoltageCurrentResult, VoltageEvent, VoltageResult from .quench_heater_ccc import QHCCCAnalysis from .quench_heater_voltage_analysis import QuenchHeaterVoltageAnalysis from .quench_heater_voltage_current_analysis import QuenchHeaterVoltageCurrentAnalysis @@ -8,7 +8,6 @@ __all__ = [ "QHCCCAnalysis", "QuenchHeaterVoltageAnalysis", "QuenchHeaterVoltageCurrentAnalysis", - "Result", "VoltageCurrentEvent", "VoltageCurrentResult", "VoltageEvent", diff --git a/lhcsmqh/analyses/commons.py b/lhcsmqh/analyses/commons.py index e8d1e31..79dcbbc 100644 --- a/lhcsmqh/analyses/commons.py +++ b/lhcsmqh/analyses/commons.py @@ -4,11 +4,12 @@ from __future__ import annotations import datetime from collections.abc import Sequence -from dataclasses import dataclass +from dataclasses import dataclass, fields from typing import Any import numpy as np import pandas as pd +from lhcsmapi import reference from lhcsmapi.analysis import comparison # type: ignore from lhcsmapi.analysis import features_helper as utility_features from lhcsmapi.api import processing, query, resolver @@ -18,21 +19,14 @@ from lhcsmapi.pyedsl.dbsignal.SignalIndexConversion import SignalIndexConversion from lhcsmapi.signal_analysis import features as signal_analysis # type: ignore from lhcsmapi.signal_analysis import functions as signal_analysis_functions from lhcsmapi.Time import Time +from pyspark.sql import SparkSession -def _check_list_dfs(first: list[pd.DataFrame] | None, second: list[pd.DataFrame] | None): - if first is None and second is None: - return True +def _check_list_dfs(first: list[pd.DataFrame], second: list[pd.DataFrame]) -> bool: + return len(first) == len(second) and all(f.equals(s) for f, s in zip(first, second)) - return ( - first is not None - and second is not None - and len(first) == len(second) - and all(f.equals(s) for f, s in zip(first, second)) - ) - -@dataclass +@dataclass(frozen=True) class Event: """Base class for a discharge event.""" @@ -41,9 +35,11 @@ class Event: circuit_name: str def __eq__(self, other: Any) -> bool: + if not isinstance(other, Event): + return False + return ( - isinstance(other, Event) - and self.source == other.source + self.source == other.source and self.circuit_type == other.circuit_type and self.circuit_name == other.circuit_name ) @@ -55,86 +51,78 @@ class VoltageSignals: u_hds: list[pd.DataFrame] def __eq__(self, other: Any) -> bool: - return ( - isinstance(other, VoltageSignals) - and self.timestamp == other.timestamp - and _check_list_dfs(self.u_hds, other.u_hds) - ) - + if not isinstance(other, VoltageSignals): + return False -@dataclass(frozen=True) -class VoltageCurrentSignals(VoltageSignals): - i_hds: list[pd.DataFrame] - r_hds: list[pd.DataFrame] + return self.timestamp == other.timestamp and _check_list_dfs(self.u_hds, other.u_hds) - def __eq__(self, other: Any) -> bool: - return ( - isinstance(other, VoltageCurrentSignals) - and super().__eq__(other) - and _check_list_dfs(self.i_hds, other.i_hds) - and _check_list_dfs(self.r_hds, other.r_hds) - ) - -@dataclass +@dataclass(frozen=True) class VoltageEvent(Event): signals: VoltageSignals reference_signals: VoltageSignals def __eq__(self, other: Any) -> bool: + if not isinstance(other, VoltageEvent): + return False + return ( super().__eq__(other) - and isinstance(other, VoltageEvent) and self.signals == other.signals and self.reference_signals == other.reference_signals ) -@dataclass +@dataclass(frozen=True) +class VoltageCurrentSignals: + timestamp: int + u_hds_dfs: list[pd.DataFrame] + u_hds_sync_dfs: list[pd.DataFrame] + u_hds_decay_dfs: list[pd.DataFrame] + u_hds_decay_sync_dfs: list[pd.DataFrame] + i_hds_dfs: list[pd.DataFrame] + i_hds_sync_dfs: list[pd.DataFrame] + i_hds_decay_dfs: list[pd.DataFrame] + i_hds_decay_sync_dfs: list[pd.DataFrame] + r_hds: list[pd.DataFrame] + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, VoltageCurrentSignals): + return False + + return all( + ( + getattr(self, field.name) == getattr(other, field.name) + if isinstance(getattr(self, field.name), int) + else _check_list_dfs(getattr(self, field.name), getattr(other, field.name)) + ) + for field in fields(self) + ) + + +@dataclass(frozen=True) class VoltageCurrentEvent(Event): signals: VoltageCurrentSignals reference_signals: VoltageCurrentSignals - def __eq__(self, other) -> bool: + def __eq__(self, other: Any) -> bool: + if not isinstance(other, VoltageCurrentEvent): + return False + return ( super().__eq__(other) - and isinstance(other, VoltageCurrentEvent) and self.signals == other.signals and self.reference_signals == other.reference_signals ) -def _check_df(first: pd.DataFrame | None, second: pd.DataFrame | None): - return (first is None and second is None) or (first is not None and second is not None and first.equals(second)) - - -@dataclass -class Result: - source: str - timestamp: int - first_last_u_comp: pd.DataFrame | None - tau_u_comp: pd.DataFrame | None - - def __eq__(self, other) -> bool: - return ( - isinstance(other, Result) - and self.source == other.source - and self.timestamp == other.timestamp - and _check_df(self.first_last_u_comp, other.first_last_u_comp) - and _check_df(self.tau_u_comp, other.tau_u_comp) - ) - - def __bool__(self) -> bool: - return ( - self.first_last_u_comp is None - or self.tau_u_comp is None - or bool(self.first_last_u_comp["result"].all() and self.tau_u_comp["result"].all()) - ) +def repr_dataframe(df: pd.DataFrame) -> str: + return f"pd.DataFrame({df.to_dict()})" -@dataclass -class VoltageResult(Result): +@dataclass(frozen=True) +class VoltageResult: """QH Analysis result for IPD/IPQ/IT circuits. Args: @@ -148,25 +136,35 @@ class VoltageResult(Result): whether the values are inside ranges specified by the reference. """ + source: str + timestamp: int + first_last_u_comp: pd.DataFrame + tau_u_comp: pd.DataFrame + def __eq__(self, other: Any) -> bool: - return super().__eq__(other) and isinstance(other, VoltageResult) + if not isinstance(other, VoltageResult): + return False - def __repr__(self) -> str: - first_last_u_comp = ( - f"pd.DataFrame({self.first_last_u_comp.to_dict()})" if self.first_last_u_comp is not None else "None" + return ( + self.source == other.source + and self.timestamp == other.timestamp + and self.first_last_u_comp.equals(other.first_last_u_comp) + and self.tau_u_comp.equals(other.tau_u_comp) ) - tau_u_comp = f"pd.DataFrame({self.tau_u_comp.to_dict()})" if self.tau_u_comp is not None else "None" + + def __repr__(self) -> str: return ( f"{self.__class__.__name__}(source={self.source!r}, timestamp={self.timestamp}, " - f"first_last_u_comp={first_last_u_comp}, tau_u_comp={tau_u_comp})" + f"first_last_u_comp={repr_dataframe(self.first_last_u_comp)}, " + f"tau_u_comp={repr_dataframe(self.tau_u_comp)})" ) def __bool__(self) -> bool: - return super().__bool__() + return bool(self.first_last_u_comp["result"].all() and self.tau_u_comp["result"].all()) -@dataclass -class VoltageCurrentResult(Result): +@dataclass(frozen=True) +class VoltageCurrentResult(VoltageResult): """QH Analysis result for RB/RQ circuits. Args: @@ -186,46 +184,38 @@ class VoltageCurrentResult(Result): by the reference. """ - first_r_comp: pd.DataFrame | None - capacitance_comp: pd.DataFrame | None + first_r_comp: pd.DataFrame + capacitance_comp: pd.DataFrame def __eq__(self, other: Any) -> bool: + if not isinstance(other, VoltageCurrentResult): + return False + return ( super().__eq__(other) - and isinstance(other, VoltageCurrentResult) - and _check_df(self.first_r_comp, other.first_r_comp) - and _check_df(self.capacitance_comp, other.capacitance_comp) + and self.first_r_comp.equals(other.first_r_comp) + and self.capacitance_comp.equals(other.capacitance_comp) ) def __repr__(self) -> str: - first_last_u_comp = ( - f"pd.DataFrame({self.first_last_u_comp.to_dict()})" if self.first_last_u_comp is not None else "None" - ) - tau_u_comp = f"pd.DataFrame({self.tau_u_comp.to_dict()})" if self.tau_u_comp is not None else "None" - first_r_comp = f"pd.DataFrame({self.first_r_comp.to_dict()})" if self.first_r_comp is not None else "None" - capacitance_comp = ( - f"pd.DataFrame({self.capacitance_comp.to_dict()})" if self.capacitance_comp is not None else "None" - ) return ( f"{self.__class__.__name__}(source={self.source!r}, timestamp={self.timestamp}, " - f"first_last_u_comp={first_last_u_comp}, tau_u_comp={tau_u_comp}, " - f"first_r_comp={first_r_comp}, capacitance_comp={capacitance_comp})" + f"first_last_u_comp={repr_dataframe(self.first_last_u_comp)}, " + f"tau_u_comp={repr_dataframe(self.tau_u_comp)}, " + f"first_r_comp={repr_dataframe(self.first_r_comp)}, " + f"capacitance_comp={repr_dataframe(self.capacitance_comp)})" ) def __bool__(self) -> bool: - return ( - self.first_r_comp is None - or self.capacitance_comp is None - or bool(super().__bool__() and self.first_r_comp["result"].all() and self.capacitance_comp["result"].all()) - ) + return bool(super().__bool__() and self.first_r_comp["result"].all() and self.capacitance_comp["result"].all()) -def filter_median(hds_dfs, window=3) -> list[pd.DataFrame]: +def filter_median(hds_dfs: list[pd.DataFrame], window: int = 3) -> list[pd.DataFrame]: return [hds_df.rolling(window=window, min_periods=1).median() for hds_df in hds_dfs] -def get_start_index_with_current_mean_std(i_hds_df: pd.DataFrame, index_increment: int, mean_start_value: int) -> int: - def has_window_mean_std(x: np.ndarray, mean=50, std=0.1) -> bool: +def get_start_index_with_current_mean_std(i_hds_df: pd.DataFrame, index_increment: int, mean_start_value: float) -> int: + def has_window_mean_std(x: np.ndarray, mean: float = 50, std: float = 0.1) -> bool: return (x.mean() >= mean) and (x.std() <= std) with_mean_std_df = ( @@ -242,7 +232,9 @@ def get_start_index_with_current_mean_std(i_hds_df: pd.DataFrame, index_incremen return with_mean_std_df.index.get_loc(mask.index[0]) # type: ignore -def get_decay_start_index(i_hds_dfs: list[pd.DataFrame], mean_start_value=50, index_increment=20) -> int: +def get_decay_start_index( + i_hds_dfs: list[pd.DataFrame], mean_start_value: float = 50, index_increment: int = 20 +) -> int: decay_start_index = 0 for i_hds_df in i_hds_dfs: i_decay_start_index = get_start_index_with_current_mean_std(i_hds_df, index_increment, mean_start_value) @@ -251,7 +243,7 @@ def get_decay_start_index(i_hds_dfs: list[pd.DataFrame], mean_start_value=50, in return decay_start_index -def extract_decay(hds_dfs, index_decay_start): +def extract_decay(hds_dfs: list[pd.DataFrame], index_decay_start: int) -> list[pd.DataFrame]: if index_decay_start == 0: return hds_dfs @@ -286,10 +278,50 @@ def calculate_resistance(u_hds_dfs: list[pd.DataFrame], i_hds_dfs: list[pd.DataF return r_hds_dfs -def calculate_capacitance(tau_df, first_r_df): - def get_transposed_with_feature_name(df: pd.DataFrame): +def calculate_capacitance(tau_df: pd.DataFrame, first_r_df: pd.DataFrame) -> pd.DataFrame: + """Calculates capacitance values for Quench Heater circuits using tau and resistance measurements. + + Uses the RC circuit relationship: C = tau/R, where: + - tau is the time constant from voltage decay curves + - R is the initial resistance measurement + + Args: + tau_df: DataFrame containing time constants (tau) calculated from voltage decay analysis. + Expected columns format: "<circuit>:U_HDS_<number>:tau_charge" + first_r_df: DataFrame containing initial resistance measurements. + Expected columns format: "<circuit>:R_HDS_<number>:first20mean" + + Returns: + DataFrame with calculated capacitance values. + Column format: "<circuit>_C_HDS_<number>:capacitance" + """ + + def get_transposed_with_feature_name(df: pd.DataFrame) -> pd.DataFrame: + """Helper function to transpose DataFrame and rename indices for capacitance calculation. + + Args: + df: Input DataFrame with voltage or resistance measurements + + Returns: + Transposed DataFrame with renamed indices following capacitance naming convention + """ + def get_feature_name(old_name: str) -> str: - return old_name.split(":")[0] + "_C_HDS_" + old_name.split(":")[-2].split("_")[-1] + ":capacitance" + """Converts signal names to capacitance feature names. + + Args: + old_name: Original signal name from voltage or resistance measurements + + Returns: Capacitance feature name + + Examples: + >>> get_feature_name("MQXFA.A2R1:U_HDS_1:tau_charge") + "MQXFA.A2R1_C_HDS_1:capacitance" + """ + circuit_name = old_name.split(":")[0] + heater_id = old_name.split(":")[-2].split("_")[-1] # e.g., "1" from "U_HDS_1" or "R_HDS_1" + + return f"{circuit_name}_C_HDS_{heater_id}:capacitance" transposed = df.T transposed.index = transposed.index.map(get_feature_name) @@ -298,98 +330,135 @@ def calculate_capacitance(tau_df, first_r_df): tau_df_transposed = get_transposed_with_feature_name(tau_df.filter(regex="(.*):U_(.*)")) first_r_df_transposed = get_transposed_with_feature_name(first_r_df) + # Calculate capacitance values using C = tau/R tau_df_transposed.iloc[:, 0] = tau_df_transposed.iloc[:, 0] / first_r_df_transposed.iloc[:, 0] return tau_df_transposed.T -def analyze_single_qh_voltage_current_with_ref( +def query_voltage_current_signals( circuit_type: str, - discharge_level: int, - source_qh: str, - timestamp_qh: int, - u_hds_dfs: list[pd.DataFrame], - i_hds_dfs: list[pd.DataFrame], - timestamp_ref: int, - u_hds_ref_dfs: list[pd.DataFrame], - i_hds_ref_dfs: list[pd.DataFrame], - current_offset, - mean_start_value=50, -) -> tuple[ - pd.DataFrame, - pd.DataFrame, - pd.DataFrame, - pd.DataFrame, - list[pd.DataFrame], - list[pd.DataFrame], - list[pd.DataFrame], - list[pd.DataFrame], - list[pd.DataFrame], - list[pd.DataFrame], -]: - # Extract decay + circuit_name: str, + source: str, + timestamp: int, + current_offset: float, + mean_start_value: float = 50, +) -> VoltageCurrentSignals: + u_hds_dfs = query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, "U_HDS") + i_hds_dfs = query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, "I_HDS") + u_hds_decay_dfs, i_hds_decay_dfs = preprocess_voltage_current( u_hds_dfs, i_hds_dfs, current_offset, mean_start_value ) - u_hds_decay_ref_dfs, i_hds_decay_ref_dfs = preprocess_voltage_current( - u_hds_ref_dfs, i_hds_ref_dfs, current_offset, mean_start_value - ) # Synchronize time of the raw signal from PM to 0 # # For plotting i_index_to_sync = i_hds_decay_dfs[0].index[0] - i_index_to_sync_ref = i_hds_decay_ref_dfs[0].index[0] u_index_to_sync = u_hds_decay_dfs[0].index[0] - u_index_to_sync_ref = u_hds_decay_ref_dfs[0].index[0] - u_hds_sync_dfs = [SignalIndexConversion.synchronize_df(hds_df, u_index_to_sync) for hds_df in u_hds_dfs] - i_hds_sync_dfs = [SignalIndexConversion.synchronize_df(hds_df, i_index_to_sync) for hds_df in i_hds_dfs] - - u_hds_ref_sync_dfs = [SignalIndexConversion.synchronize_df(hds_df, u_index_to_sync_ref) for hds_df in u_hds_ref_dfs] - i_hds_ref_sync_dfs = [SignalIndexConversion.synchronize_df(hds_df, i_index_to_sync_ref) for hds_df in i_hds_ref_dfs] + u_hds_sync_dfs: list[pd.DataFrame] = [ + SignalIndexConversion.synchronize_df(hds_df, u_index_to_sync) for hds_df in u_hds_dfs # type: ignore + ] + i_hds_sync_dfs: list[pd.DataFrame] = [ + SignalIndexConversion.synchronize_df(hds_df, i_index_to_sync) for hds_df in i_hds_dfs # type: ignore + ] # # For feature engineering u_hds_decay_sync_dfs = SignalIndexConversion.synchronize_dfs(u_hds_decay_dfs) i_hds_decay_sync_dfs = SignalIndexConversion.synchronize_dfs(i_hds_decay_dfs) - u_hds_decay_ref_sync_dfs = SignalIndexConversion.synchronize_dfs(u_hds_decay_ref_dfs) - i_hds_decay_ref_sync_dfs = SignalIndexConversion.synchronize_dfs(i_hds_decay_ref_dfs) - # Calculate resistance r_hds_dfs = calculate_resistance(u_hds_decay_sync_dfs, i_hds_decay_sync_dfs) - r_hds_ref_dfs = calculate_resistance(u_hds_decay_ref_sync_dfs, i_hds_decay_ref_sync_dfs) + return VoltageCurrentSignals( + timestamp=timestamp, + u_hds_dfs=u_hds_dfs, + u_hds_sync_dfs=u_hds_sync_dfs, + u_hds_decay_dfs=u_hds_decay_dfs, + u_hds_decay_sync_dfs=u_hds_decay_sync_dfs, + i_hds_dfs=i_hds_dfs, + i_hds_sync_dfs=i_hds_sync_dfs, + i_hds_decay_dfs=i_hds_decay_dfs, + i_hds_decay_sync_dfs=i_hds_decay_sync_dfs, + r_hds=r_hds_dfs, + ) + + +def query_voltage_current_event( + circuit_type: str, circuit_name: str, source: str, timestamp: int, current_offset: float, mean_start_value: float +) -> VoltageCurrentEvent: + signals = query_voltage_current_signals( + circuit_type, circuit_name, source, timestamp, current_offset, mean_start_value + ) + reference_signals = query_voltage_current_signals( + circuit_type, + circuit_name, + source, + reference.get_quench_heater_reference_discharge( + signal_metadata.get_circuit_type_for_circuit_name(circuit_name), source, timestamp + ), + current_offset, + mean_start_value, + ) + return VoltageCurrentEvent( + source=source, + circuit_type=circuit_type, + circuit_name=circuit_name, + signals=signals, + reference_signals=reference_signals, + ) + +def analyze_voltage_current_event( + voltage_current_event: VoltageCurrentEvent, discharge_level: int +) -> VoltageCurrentResult: # Calculate features first_last_u_df = signal_analysis.calculate_features( - u_hds_decay_sync_dfs, [utility_features.first, utility_features.last20mean], timestamp_qh + voltage_current_event.signals.u_hds_decay_sync_dfs, + [utility_features.first, utility_features.last20mean], + voltage_current_event.signals.timestamp, ) first_last_u_comp_df = comparison.compare_features_to_reference( - first_last_u_df, circuit_type, "QH", wildcard={"CELL": source_qh}, nominal_voltage=discharge_level + first_last_u_df, + voltage_current_event.circuit_type, + "QH", + wildcard={"CELL": voltage_current_event.source}, + nominal_voltage=discharge_level, ) - first_r_df = signal_analysis.calculate_features(r_hds_dfs, utility_features.first20mean, timestamp_qh) - first_r_ref_df = signal_analysis.calculate_features(r_hds_ref_dfs, utility_features.first20mean, timestamp_ref) + first_r_df = signal_analysis.calculate_features( + voltage_current_event.signals.r_hds, utility_features.first20mean, voltage_current_event.signals.timestamp + ) + first_r_ref_df = signal_analysis.calculate_features( + voltage_current_event.reference_signals.r_hds, + utility_features.first20mean, + voltage_current_event.reference_signals.timestamp, + ) first_r_comp_df = comparison.compare_difference_of_features_to_reference( first_r_df, first_r_ref_df, - circuit_type, + voltage_current_event.circuit_type, "QH", - wildcard={"CELL": source_qh}, + wildcard={"CELL": voltage_current_event.source}, nominal_voltage=discharge_level, precision=2, ) tau_df = signal_analysis.calculate_features( - u_hds_decay_sync_dfs + i_hds_decay_sync_dfs, signal_analysis_functions.tau_charge, timestamp_qh + voltage_current_event.signals.u_hds_decay_sync_dfs + voltage_current_event.signals.i_hds_decay_sync_dfs, + signal_analysis_functions.tau_charge, + voltage_current_event.signals.timestamp, ) tau_ref_df = signal_analysis.calculate_features( - u_hds_decay_ref_sync_dfs + i_hds_decay_ref_sync_dfs, signal_analysis_functions.tau_charge, timestamp_qh + voltage_current_event.reference_signals.u_hds_decay_sync_dfs + + voltage_current_event.reference_signals.i_hds_decay_sync_dfs, + signal_analysis_functions.tau_charge, + voltage_current_event.reference_signals.timestamp, ) tau_comp_df = comparison.compare_difference_of_features_to_reference( tau_df, tau_ref_df, - circuit_type, + voltage_current_event.circuit_type, "QH", - wildcard={"CELL": source_qh}, + wildcard={"CELL": voltage_current_event.source}, nominal_voltage=discharge_level, precision=3, ) @@ -397,24 +466,20 @@ def analyze_single_qh_voltage_current_with_ref( capacitance_df = calculate_capacitance(tau_df, first_r_df) capacitance_ref_df = calculate_capacitance(tau_ref_df, first_r_ref_df) capacitance_comp_df = comparison.compare_difference_of_features_to_reference( - capacitance_df, capacitance_ref_df, circuit_type, "QH", precision=3 + capacitance_df, capacitance_ref_df, voltage_current_event.circuit_type, "QH", precision=3 ) - return ( - first_last_u_comp_df, - first_r_comp_df, - tau_comp_df, - capacitance_comp_df, - r_hds_dfs, - r_hds_ref_dfs, - u_hds_sync_dfs, - i_hds_sync_dfs, - u_hds_ref_sync_dfs, - i_hds_ref_sync_dfs, + return VoltageCurrentResult( + source=voltage_current_event.source, + timestamp=voltage_current_event.signals.timestamp, + first_last_u_comp=first_last_u_comp_df, + first_r_comp=first_r_comp_df, + tau_u_comp=tau_comp_df, + capacitance_comp=capacitance_comp_df, ) -def find_source_timestamp_qh(circuit_type, circuit_name, start_time, stop_time): +def find_source_timestamp_qh(circuit_type: str, circuit_name: str, start_time: int, stop_time: int) -> pd.DataFrame: pm_params = resolver.get_params_for_pm_events( circuit_type=circuit_type, circuit_name=circuit_name, @@ -431,7 +496,7 @@ def find_source_timestamp_qh(circuit_type, circuit_name, start_time, stop_time): processing.EventProcessing(events) .filter_source(circuit_type, circuit_name, "QH") .sort_values(by=["timestamp", "source"]) - .drop_duplicates(column=["source", "timestamp"]) + .drop_duplicates(column=["source", "timestamp"]) # type: ignore .get_dataframe() ) events["circuit_type"] = circuit_type @@ -440,7 +505,9 @@ def find_source_timestamp_qh(circuit_type, circuit_name, start_time, stop_time): return events -def query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, signals): +def query_single_qh_event_pm( + circuit_type: str, circuit_name: str, source: str, timestamp: int, signals: str +) -> list[pd.DataFrame]: pm_params = resolver.get_params_for_pm_signals( circuit_type, circuit_name, "QH", timestamp, signals=signals, wildcard={"CELL": source} ) @@ -457,19 +524,9 @@ def query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, sign ) -def query_qh_pm(circuit_type, circuit_name, events, signals, is_ref): - signals_dfs = [] - - for _, row in events.iterrows(): - source = row["source"] - timestamp = row["reference_timestamp"] if is_ref is True else row["timestamp"] - - signals_dfs.append(query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, signals)) - - return signals_dfs - - -def query_single_qh_event_nxcals(spark, circuit_type, circuit_name, source, timestamp, duration, signals): +def query_single_qh_event_nxcals( + spark: SparkSession, circuit_type: str, circuit_name: str, source: str, timestamp: int, duration: int, signals: str +) -> list[pd.DataFrame]: nxcals_params = resolver.get_params_for_nxcals( circuit_type, circuit_name, "QH", timestamp, duration, signals=signals, wildcard={"MAGNET": source} ) @@ -477,7 +534,7 @@ def query_single_qh_event_nxcals(spark, circuit_type, circuit_name, source, time return query.query_nxcals_by_variables(spark, nxcals_params) -def _extract_voltage_decay(u_hds_dfs): +def _extract_voltage_decay(u_hds_dfs: list[pd.DataFrame]) -> list[pd.DataFrame]: u_hds_decay_dfs = [] for u_hds_df in u_hds_dfs: max_value = u_hds_df.max().values[0] @@ -501,60 +558,85 @@ def fill_value_at_location_0_with_latest_preceding(df: pd.DataFrame) -> pd.DataF return df.sort_index() -def analyze_single_qh_voltage_with_ref( - u_dfs: list[pd.DataFrame], - u_dfs_ref: list[pd.DataFrame], - circuit_name: str, - timestamp_event: int, - timestamp_ref: int, - discharge_level: float, -) -> tuple[pd.DataFrame, pd.DataFrame]: - meta_circuit_type = signal_metadata.get_circuit_type_for_circuit_name(circuit_name) +def query_voltage_signals(circuit_type: str, circuit_name: str, source: str, timestamp: int) -> VoltageSignals: + return VoltageSignals(timestamp, query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, "U_HDS")) + + +def query_voltage_event(circuit_type: str, circuit_name: str, source: str, timestamp: int) -> VoltageEvent | None: + voltage_signals = query_voltage_signals(circuit_type, circuit_name, source, timestamp) + if all(df.empty for df in voltage_signals.u_hds): + # Because QH pm data dumps have the same class name as other systems (e.g QDS, LEADS), + # it is impossible to distinguish between them when querying pm data headers. + # The only way to know is to check if u_hds is empty or not. If it is empty, it is not a QH event. + return None + + return VoltageEvent( + source, + circuit_type, + circuit_name, + voltage_signals, + query_voltage_signals( + circuit_type, + circuit_name, + source, + reference.get_quench_heater_reference_discharge( + signal_metadata.get_circuit_type_for_circuit_name(circuit_name), source, timestamp + ), + ), + ) + + +def analyze_voltage_event(voltage_event: VoltageEvent, discharge_level: float) -> VoltageResult: + u_dfs = voltage_event.signals.u_hds + u_dfs_ref = voltage_event.reference_signals.u_hds + + meta_circuit_type = signal_metadata.get_circuit_type_for_circuit_name(voltage_event.circuit_name) # For the IPQ, IPD and IT circuits the sampling rate changes at PM timestamp (0 here). # Sometimes the change is too slow and hence we want to treat the previous point as the 'first' one of the decay # see https://its.cern.ch/jira/browse/SIGMON-629 if ( - signal_metadata.is_ipd(circuit_name) - or signal_metadata.is_ipq(circuit_name) - or signal_metadata.is_inner_triplet(circuit_name) + signal_metadata.is_ipd(voltage_event.circuit_name) + or signal_metadata.is_ipq(voltage_event.circuit_name) + or signal_metadata.is_inner_triplet(voltage_event.circuit_name) ): - for i in range(len(u_dfs)): - u_dfs[i] = fill_value_at_location_0_with_latest_preceding(u_dfs[i]) - - for i in range(len(u_dfs_ref)): - u_dfs_ref[i] = fill_value_at_location_0_with_latest_preceding(u_dfs_ref[i]) + u_dfs = [fill_value_at_location_0_with_latest_preceding(u_df) for u_df in u_dfs] + u_dfs_ref = [fill_value_at_location_0_with_latest_preceding(u_df) for u_df in u_dfs_ref] u_hds_decay_dfs = _extract_voltage_decay(u_dfs) u_hds_decay_dfs_ref = _extract_voltage_decay(u_dfs_ref) first_last_u_df = signal_analysis.calculate_features( - u_hds_decay_dfs, [utility_features.first, utility_features.last20mean], timestamp_event + u_hds_decay_dfs, [utility_features.first, utility_features.last20mean], voltage_event.signals.timestamp ) first_last_u_comp_df = comparison.compare_features_to_reference( - first_last_u_df, meta_circuit_type, "QH", wildcard={"CIRCUIT": circuit_name}, nominal_voltage=discharge_level + first_last_u_df, + meta_circuit_type, + "QH", + wildcard={"CIRCUIT": voltage_event.circuit_name}, + nominal_voltage=discharge_level, ) # tau charge is calculated always from 0 (which is the PM timestamp) u_hds_decay_dfs = [df.loc[0:, :] for df in u_hds_decay_dfs] u_hds_decay_dfs_ref = [df.loc[0:, :] for df in u_hds_decay_dfs_ref] tau_u_df = signal_analysis.calculate_features( - u_hds_decay_dfs, signal_analysis_functions.tau_charge, timestamp_event + u_hds_decay_dfs, signal_analysis_functions.tau_charge, voltage_event.signals.timestamp ) tau_u_df_ref = signal_analysis.calculate_features( - u_hds_decay_dfs_ref, signal_analysis_functions.tau_charge, timestamp_ref + u_hds_decay_dfs_ref, signal_analysis_functions.tau_charge, voltage_event.reference_signals.timestamp ) tau_u_comp_df = comparison.compare_difference_of_features_to_reference( tau_u_df, tau_u_df_ref, meta_circuit_type, "QH", - wildcard={"CIRCUIT": circuit_name}, + wildcard={"CIRCUIT": voltage_event.circuit_name}, nominal_voltage=discharge_level, precision=3, ) - return first_last_u_comp_df, tau_u_comp_df + return VoltageResult(voltage_event.source, voltage_event.signals.timestamp, first_last_u_comp_df, tau_u_comp_df) DETAILED_CIRCUIT_TYPES_MAP = { @@ -571,7 +653,7 @@ def arguments_check( supported_types: Sequence[GenericCircuitType], start_time: int | str | datetime.datetime, stop_time: int | str | datetime.datetime, -): +) -> None: """Throws ValueError if the provided arguments are not consistent.""" if Time.to_unix_timestamp(start_time) >= Time.to_unix_timestamp(stop_time): raise ValueError("stop_time must be strictly grater than start_time.") diff --git a/lhcsmqh/analyses/quench_heater_ccc.py b/lhcsmqh/analyses/quench_heater_ccc.py index b1bed23..5ea5717 100644 --- a/lhcsmqh/analyses/quench_heater_ccc.py +++ b/lhcsmqh/analyses/quench_heater_ccc.py @@ -5,7 +5,6 @@ from __future__ import annotations import datetime import pandas as pd -from lhcsmapi import reference from lhcsmapi.api import analysis from lhcsmapi.metadata import signal_metadata from lhcsmapi.metadata.signal_metadata import GenericCircuitType @@ -18,6 +17,7 @@ class QHCCCAnalysis(analysis.Analysis): """QH Analysis for CCC""" _NOMINAL_VOLTAGE = 900 + _MEAN_START_VALUE = 50 def __init__( self, @@ -46,18 +46,12 @@ class QHCCCAnalysis(analysis.Analysis): if initial_charge_check: self._charge_check_level = 800 - self.voltage_current_events: list[commons.VoltageCurrentEvent] = [] - self.voltage_current_results: list[commons.VoltageCurrentResult] = [] - self.voltage_events: list[commons.VoltageEvent] = [] - self.voltage_results: list[commons.VoltageResult] = [] - self.low_charge_events: pd.DataFrame | None = None - @property - def initial_charge_check(self): + def initial_charge_check(self) -> bool: """Returns whether if the initial charge check has been enabled.""" return self._initial_charge_check - def _get_qh_max_charge_from_nxcals(self, source: str, circuit_name: str, timestamp: int): + def _get_qh_max_charge_from_nxcals(self, source: str, circuit_name: str, timestamp: int) -> float: meta_circuit_type = signal_metadata.get_circuit_type_for_circuit_name(circuit_name) source = f"MQ.{source}" if meta_circuit_type == "RQ" else f"MB.{source}" @@ -75,11 +69,12 @@ class QHCCCAnalysis(analysis.Analysis): return qh_max_charge - def query(self): - source_timestamp_qds_df = pd.DataFrame() - + def query(self) -> None: + self.low_charge_events = pd.DataFrame() + self.voltage_events: list[commons.VoltageEvent] = [] + self.voltage_current_events: list[commons.VoltageCurrentEvent] = [] for circuit_type, detailed_circuit_types in commons.DETAILED_CIRCUIT_TYPES_MAP.items(): - circuit_names = signal_metadata.get_circuit_names(detailed_circuit_types) + circuit_names = signal_metadata.get_circuit_names(list(detailed_circuit_types)) if circuit_type == GenericCircuitType.RQ: circuit_names = circuit_names[0:7] # RQFs and RQDs are the same from the powering point of view @@ -91,201 +86,54 @@ class QHCCCAnalysis(analysis.Analysis): ) if not source_timestamp_qds_df_i.empty: - source_timestamp_qds_df_i["circuit_type"] = circuit_type - source_timestamp_qds_df_i["circuit_name"] = circuit_name - source_timestamp_qds_df = pd.concat([source_timestamp_qds_df, source_timestamp_qds_df_i]) - - if not source_timestamp_qds_df.empty: - source_timestamp_qds_df["datetime"] = source_timestamp_qds_df.apply( - lambda row: Time.to_string(row["timestamp"]), axis=1 - ) - source_timestamp_qds_df = source_timestamp_qds_df[ - ["source", "timestamp", "datetime", "circuit_type", "circuit_name"] - ] - source_timestamp_qds_df["reference_timestamp"] = source_timestamp_qds_df.apply( - lambda row: reference.get_quench_heater_reference_discharge( - signal_metadata.get_circuit_type_for_circuit_name(row["circuit_name"]), - row["source"], - row["timestamp"], - ), - axis=1, - ) - - if self._initial_charge_check: - source_timestamp_qds_df["max_charge"] = source_timestamp_qds_df.apply( - lambda row: self._get_qh_max_charge_from_nxcals(row.source, row.circuit_name, row.timestamp), axis=1 - ) - - self._source_timestamp_qds_df = source_timestamp_qds_df - - if not self._source_timestamp_qds_df.empty: - self._groups = source_timestamp_qds_df.groupby("circuit_name") - else: - self._groups = [] - - self._u_signals_dfs = {} - self._u_signals_dfs_ref = {} - self._i_signals_dfs = {} - self._i_signals_dfs_ref = {} - - for circuit_name, source_timestamp_df_i in self._groups: - circuit_type = source_timestamp_df_i["circuit_type"].values[0] - meta_circuit_type = signal_metadata.get_circuit_type_for_circuit_name(circuit_name) - - self._u_signals_dfs[circuit_name] = commons.query_qh_pm( - meta_circuit_type, circuit_name, source_timestamp_df_i, "U_HDS", False - ) - - self._u_signals_dfs_ref[circuit_name] = commons.query_qh_pm( - meta_circuit_type, circuit_name, source_timestamp_df_i, "U_HDS", True - ) - - # Current signals is only useful for RB and RQ circuit types - if circuit_type in ["RB", "RQ"]: - self._i_signals_dfs[circuit_name] = commons.query_qh_pm( - meta_circuit_type, circuit_name, source_timestamp_df_i, "I_HDS", False - ) - - self._i_signals_dfs_ref[circuit_name] = commons.query_qh_pm( - meta_circuit_type, circuit_name, source_timestamp_df_i, "I_HDS", True - ) - - def _analyze_voltage(self, circuit_type, circuit_name, source_timestamp_df_i) -> list[commons.VoltageResult]: - result = [] - u_signals = self._u_signals_dfs[circuit_name] - u_signals_ref = self._u_signals_dfs_ref[circuit_name] - - for i, row in source_timestamp_df_i.iterrows(): - source = row["source"] - timestamp = row["timestamp"] - reference_timestamp = row["reference_timestamp"] - if u_signals[i]: - self.voltage_events.append( - commons.VoltageEvent( - source, - circuit_type, - circuit_name, - commons.VoltageSignals(timestamp, u_signals[i]), - commons.VoltageSignals(reference_timestamp, u_signals_ref[i]), - ) - ) - - first_last_u_comp_df, tau_u_comp_df = commons.analyze_single_qh_voltage_with_ref( - u_signals[i], u_signals_ref[i], circuit_name, timestamp, reference_timestamp, self._NOMINAL_VOLTAGE - ) - - result.append(commons.VoltageResult(source, timestamp, first_last_u_comp_df, tau_u_comp_df)) - else: - self.voltage_events.append( - commons.VoltageEvent( - source, - circuit_type, - circuit_name, - commons.VoltageSignals(timestamp, []), - commons.VoltageSignals(reference_timestamp, []), - ) - ) - - result.append(commons.VoltageResult(source, timestamp, None, None)) - - return result - - def _analyze_voltage_current(self, circuit_type, circuit_name, source_timestamp_df_i): - result = [] - - u_signals = self._u_signals_dfs[circuit_name] - u_signals_ref = self._u_signals_dfs_ref[circuit_name] - i_signals = self._i_signals_dfs[circuit_name] - i_signals_ref = self._i_signals_dfs_ref[circuit_name] - - for i, row in source_timestamp_df_i.iterrows(): - source = row["source"] - timestamp = row["timestamp"] - reference_timestamp = row["reference_timestamp"] - if u_signals[i] and i_signals[i]: - ( - first_last_u_comp_df, - first_r_comp_df, - tau_comp_df, - capacitance_comp_df, - r_hds_dfs, - r_hds_ref_dfs, - u_sync_signals, - i_sync_signals, - u_sync_signals_ref, - i_sync_signals_ref, - ) = commons.analyze_single_qh_voltage_current_with_ref( - circuit_type, - self._NOMINAL_VOLTAGE, - source, - timestamp, - u_signals[i], - i_signals[i], - reference_timestamp, - u_signals_ref[i], - i_signals_ref[i], - 0.085 if circuit_type == "RB" else 0.025, - ) - self.voltage_current_events.append( - commons.VoltageCurrentEvent( - source, - circuit_type, - circuit_name, - commons.VoltageCurrentSignals(timestamp, u_sync_signals, i_sync_signals, r_hds_dfs), - commons.VoltageCurrentSignals( - reference_timestamp, u_sync_signals_ref, i_sync_signals_ref, r_hds_ref_dfs - ), - ) - ) - - result.append( - commons.VoltageCurrentResult( - row["source"], - row["timestamp"], - first_last_u_comp_df, - tau_comp_df, - first_r_comp_df, - capacitance_comp_df, - ) - ) - else: - result.append(commons.VoltageCurrentResult(source, timestamp, None, None, None, None)) - self.voltage_current_events.append( - commons.VoltageCurrentEvent( - source, - circuit_type, - circuit_name, - commons.VoltageCurrentSignals(timestamp, [], [], []), - commons.VoltageCurrentSignals(reference_timestamp, [], [], []), - ) - ) - - return result - - def analyze(self): - if self._initial_charge_check and "max_charge" in self._source_timestamp_qds_df.columns: - self.low_charge_events = self._source_timestamp_qds_df[ - self._source_timestamp_qds_df["max_charge"] < self._charge_check_level - ] - - # We only analyze high charge events - self._source_timestamp_qds_df = self._source_timestamp_qds_df[ - self._source_timestamp_qds_df["max_charge"] >= self._charge_check_level - ] - - if not self._source_timestamp_qds_df.empty: - for circuit_name, source_timestamp_df_i in self._groups: - circuit_type = source_timestamp_df_i["circuit_type"].values[0] - - if circuit_type in ("RB", "RQ"): - self.voltage_current_results.extend( - self._analyze_voltage_current(circuit_type, circuit_name, source_timestamp_df_i) - ) - else: - self.voltage_results.extend( - self._analyze_voltage(circuit_type, circuit_name, source_timestamp_df_i) - ) - + for source, timestamp in source_timestamp_qds_df_i[["source", "timestamp"]].values: + charge_check = True + if self._initial_charge_check is True: + max_charge = self._get_qh_max_charge_from_nxcals(source, circuit_name, timestamp) + if max_charge < self._charge_check_level: + self.low_charge_events = pd.concat( + ( + self.low_charge_events, + pd.DataFrame( + {"source": [source], "timestamp": [timestamp], "max_charge": [max_charge]} + ), + ), + ignore_index=True, + ) + charge_check = False + + if charge_check is True: + if circuit_type in (GenericCircuitType.RB, GenericCircuitType.RQ): + current_offset = 0.085 if circuit_type == GenericCircuitType.RB else 0.025 + self.voltage_current_events.append( + commons.query_voltage_current_event( + meta_circuit_type, + circuit_name, + source, + timestamp, + current_offset, + self._MEAN_START_VALUE, + ) + ) + else: + voltage_event = commons.query_voltage_event( + meta_circuit_type, circuit_name, source, timestamp + ) + if voltage_event is None: + self._logger.warning( + f"No Quench Heater Discharges in {source} at {Time.to_string_short(timestamp)}," + " skipping." + ) + else: + self.voltage_events.append(voltage_event) + + def analyze(self) -> None: + self.voltage_results = [ + commons.analyze_voltage_event(event, self._NOMINAL_VOLTAGE) for event in self.voltage_events + ] + self.voltage_current_results = [ + commons.analyze_voltage_current_event(event, self._NOMINAL_VOLTAGE) for event in self.voltage_current_events + ] self.events_number = len(self.voltage_current_events) + len(self.voltage_events) def get_analysis_output(self) -> bool: diff --git a/lhcsmqh/analyses/quench_heater_voltage_analysis.py b/lhcsmqh/analyses/quench_heater_voltage_analysis.py index fc1d3ea..6a27216 100644 --- a/lhcsmqh/analyses/quench_heater_voltage_analysis.py +++ b/lhcsmqh/analyses/quench_heater_voltage_analysis.py @@ -5,7 +5,6 @@ from __future__ import annotations import datetime import pandas as pd -from lhcsmapi import reference from lhcsmapi.api import analysis from lhcsmapi.metadata import signal_metadata from lhcsmapi.metadata.signal_metadata import GenericCircuitType @@ -24,7 +23,7 @@ class QuenchHeaterVoltageAnalysis(analysis.Analysis): discharge_level: int, start_time: int | str | datetime.datetime, stop_time: int | str | datetime.datetime, - ): + ) -> None: """Instantiates a QuenchHeaterVoltageAnalysis class. Args: @@ -46,8 +45,6 @@ class QuenchHeaterVoltageAnalysis(analysis.Analysis): self._start_time = Time.to_unix_timestamp(start_time) self._stop_time = Time.to_unix_timestamp(stop_time) self._discharge_level = discharge_level - self.voltage_events: list[commons.VoltageEvent] = [] - self.voltage_results: list[commons.VoltageResult] = [] super().__init__(identifier) def search_discharges(self) -> pd.DataFrame: @@ -55,56 +52,22 @@ class QuenchHeaterVoltageAnalysis(analysis.Analysis): self._circuit_type, self._circuit_name, self._start_time, self._stop_time ) - def query(self, discharges=None): + def query(self, discharges: pd.DataFrame | None = None) -> None: self._events = discharges if discharges is not None else self.search_discharges() - self._events["reference_timestamp"] = self._events.apply( - lambda row: reference.get_quench_heater_reference_discharge( - signal_metadata.get_circuit_type_for_circuit_name(self._circuit_name), row["source"], row["timestamp"] - ), - axis=1, - ) - - self._signals_dfs = commons.query_qh_pm(self._circuit_type, self._circuit_name, self._events, "U_HDS", False) - self._signals_dfs_ref = commons.query_qh_pm(self._circuit_type, self._circuit_name, self._events, "U_HDS", True) - - def analyze(self): - for index, (u_dfs, u_dfs_ref) in enumerate(zip(self._signals_dfs, self._signals_dfs_ref)): - # logging here? - # the notebook prints some timestamp - - source_event = self._events.at[index, "source"] - timestamp_event = self._events.at[index, "timestamp"] - reference_timestamp = self._events.at[index, "reference_timestamp"] - - if not u_dfs: - self.voltage_events.append( - commons.VoltageEvent( - source_event, - self._circuit_type, - self._circuit_name, - commons.VoltageSignals(timestamp_event, []), - commons.VoltageSignals(reference_timestamp, []), - ) + self.voltage_events: list[commons.VoltageEvent] = [] + for source, timestamp in self._events[["source", "timestamp"]].values: + voltage_event = commons.query_voltage_event(self._circuit_type, self._circuit_name, source, timestamp) + if voltage_event is None: + self._logger.warning( + f"No Quench Heater Discharges in {source} at {Time.to_string_short(timestamp)}, skipping." ) - self.voltage_results.append(commons.VoltageResult(source_event, timestamp_event, None, None)) - continue + else: + self.voltage_events.append(voltage_event) - first_last_u_comp_df, tau_u_comp_df = commons.analyze_single_qh_voltage_with_ref( - u_dfs, u_dfs_ref, self._circuit_name, timestamp_event, reference_timestamp, self._discharge_level - ) - - self.voltage_events.append( - commons.VoltageEvent( - source_event, - self._circuit_type, - self._circuit_name, - commons.VoltageSignals(timestamp_event, u_dfs), - commons.VoltageSignals(reference_timestamp, u_dfs_ref), - ) - ) - self.voltage_results.append( - commons.VoltageResult(source_event, timestamp_event, first_last_u_comp_df, tau_u_comp_df) - ) + def analyze(self) -> None: + self.voltage_results = [ + commons.analyze_voltage_event(event, self._discharge_level) for event in self.voltage_events + ] def get_analysis_output(self) -> bool: return all(self.voltage_results) diff --git a/lhcsmqh/analyses/quench_heater_voltage_current_analysis.py b/lhcsmqh/analyses/quench_heater_voltage_current_analysis.py index 0fcbf70..15b40ab 100644 --- a/lhcsmqh/analyses/quench_heater_voltage_current_analysis.py +++ b/lhcsmqh/analyses/quench_heater_voltage_current_analysis.py @@ -6,7 +6,6 @@ import datetime import multiprocessing import pandas as pd -from lhcsmapi import reference from lhcsmapi.api import analysis from lhcsmapi.metadata import signal_metadata from lhcsmapi.metadata.signal_metadata import GenericCircuitType @@ -18,6 +17,8 @@ from lhcsmqh.analyses import commons class QuenchHeaterVoltageCurrentAnalysis(analysis.Analysis): """Analysis class for QH RB/RQ circuits""" + _CURRENT_OFFSET = 0.085 + def __init__( self, identifier: str, @@ -25,7 +26,7 @@ class QuenchHeaterVoltageCurrentAnalysis(analysis.Analysis): discharge_level: int, start_time: int | str | datetime.datetime, stop_time: int | str | datetime.datetime, - ): + ) -> None: """Instantiates a QuenchHeaterVoltageCurrentAnalysis class. Args: @@ -43,10 +44,6 @@ class QuenchHeaterVoltageCurrentAnalysis(analysis.Analysis): self._start_time = Time.to_unix_timestamp(start_time) self._stop_time = Time.to_unix_timestamp(stop_time) self._discharge_level = discharge_level - self._signals_dfs = None - self._signals_dfs_ref = None - self.voltage_current_events: list[commons.VoltageCurrentEvent] = [] - self.voltage_current_results: list[commons.VoltageCurrentResult] = [] super().__init__(identifier) @@ -55,127 +52,22 @@ class QuenchHeaterVoltageCurrentAnalysis(analysis.Analysis): self._circuit_type, self._circuit_name, self._start_time, self._stop_time ) - def query(self, discharges=None): + def query(self, discharges: pd.DataFrame | None = None) -> None: self._events = discharges if discharges is not None else self.search_discharges() - self._events["reference_timestamp"] = self._events.apply( - lambda row: reference.get_quench_heater_reference_discharge( - signal_metadata.get_circuit_type_for_circuit_name(self._circuit_name), row["source"], row["timestamp"] - ), - axis=1, - ) - - self._clean_query_results() - with multiprocessing.Pool(8) as pool: - qh_source_timestamp_dfs = pool.starmap( - _query_task, - zip( - [self._circuit_type] * len(self._events), - [self._circuit_name] * len(self._events), - self._events["source"].values, - self._events["timestamp"].values, - ), - ) - - qh_source_timestamp_dfs_ref = pool.starmap( - _query_task, - zip( - [self._circuit_type] * len(self._events), - [self._circuit_name] * len(self._events), - self._events["source"].values, - self._events["reference_timestamp"].values, - ), - ) - - self._signals_dfs = {(source, timestamp): dfs for (source, timestamp, dfs) in qh_source_timestamp_dfs} - self._signals_dfs_ref = {(source, timestamp): dfs for (source, timestamp, dfs) in qh_source_timestamp_dfs_ref} - - def analyze(self): - - for _, row in self._events.iterrows(): - source = row["source"] - timestamp = row["timestamp"] - reference_timestamp = row["reference_timestamp"] - u_hds_dfs = list(filter(lambda df: "U_HDS" in df.columns.values[0], self._signals_dfs[(source, timestamp)])) - i_hds_dfs = list(filter(lambda df: "I_HDS" in df.columns.values[0], self._signals_dfs[(source, timestamp)])) - - if not u_hds_dfs or not i_hds_dfs: - self.voltage_current_events.append( - commons.VoltageCurrentEvent( - source, - self._circuit_type, - self._circuit_name, - commons.VoltageCurrentSignals(timestamp, [], [], []), - commons.VoltageCurrentSignals(reference_timestamp, [], [], []), - ) - ) - self.voltage_current_results.append( - commons.VoltageCurrentResult(source, timestamp, None, None, None, None) - ) - continue - - u_hds_dfs_ref = list( - filter(lambda df: "U_HDS" in df.columns.values[0], self._signals_dfs_ref[(source, reference_timestamp)]) - ) - i_hds_dfs_ref = list( - filter(lambda df: "I_HDS" in df.columns.values[0], self._signals_dfs_ref[(source, reference_timestamp)]) - ) - - mean_start_value = 15 if self._discharge_level < 450 else 50 - - ( - first_last_u_comp_df, - first_r_comp_df, - tau_comp_df, - capacitance_comp_df, - r_hds_dfs, - r_hds_ref_dfs, - u_sync_signals, - i_sync_signals, - u_sync_signals_ref, - i_sync_signals_ref, - ) = commons.analyze_single_qh_voltage_current_with_ref( - self._circuit_type, - self._discharge_level, - source, - timestamp, - u_hds_dfs, - i_hds_dfs, - reference_timestamp, - u_hds_dfs_ref, - i_hds_dfs_ref, - current_offset=0.085, - mean_start_value=mean_start_value, + mean_start_value = 15 if self._discharge_level < 450 else 50 + with multiprocessing.Pool() as pool: + self.voltage_current_events = pool.starmap( + commons.query_voltage_current_event, + [ + (self._circuit_type, self._circuit_name, source, timestamp, self._CURRENT_OFFSET, mean_start_value) + for source, timestamp in self._events[["source", "timestamp"]].values + ], ) - self.voltage_current_events.append( - commons.VoltageCurrentEvent( - source, - self._circuit_type, - self._circuit_name, - commons.VoltageCurrentSignals(timestamp, u_sync_signals, i_sync_signals, r_hds_dfs), - commons.VoltageCurrentSignals( - reference_timestamp, u_sync_signals_ref, i_sync_signals_ref, r_hds_ref_dfs - ), - ) - ) - self.voltage_current_results.append( - commons.VoltageCurrentResult( - source, timestamp, first_last_u_comp_df, tau_comp_df, first_r_comp_df, capacitance_comp_df - ) - ) + def analyze(self) -> None: + self.voltage_current_results = [ + commons.analyze_voltage_current_event(event, self._discharge_level) for event in self.voltage_current_events + ] def get_analysis_output(self) -> bool: return all(self.voltage_current_results) - - def _clean_query_results(self): - self.voltage_current_events = [] - self.voltage_current_results = [] - if self._signals_dfs is not None: - del self._signals_dfs - if self._signals_dfs_ref is not None: - del self._signals_dfs_ref - - -def _query_task(circuit_type, circuit_name, source, timestamp): - result = commons.query_single_qh_event_pm(circuit_type, circuit_name, source, timestamp, ["U_HDS", "I_HDS"]) - return source, timestamp, result diff --git a/lhcsmqh/output/_common.py b/lhcsmqh/output/_common.py index 1432e1d..9ba7378 100644 --- a/lhcsmqh/output/_common.py +++ b/lhcsmqh/output/_common.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections.abc import Sequence -from typing import Callable +from typing import Any, Callable import numpy as np import pandas as pd @@ -18,7 +18,8 @@ SUMMARY_TABLE_ID = "Table" def get_summary_output( - events: Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], results: Sequence[analyses.Result] + events: Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], + results: Sequence[analyses.VoltageResult | analyses.VoltageCurrentResult], ) -> list[output_types.Output]: df_result = pd.DataFrame( @@ -36,9 +37,13 @@ def get_summary_output( def get_summary_with_links_output( - events: Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], results: Sequence[analyses.Result] + events: Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], + results: Sequence[analyses.VoltageResult | analyses.VoltageCurrentResult], ) -> list[output_types.Output]: - def get_row(event: analyses.VoltageEvent | analyses.VoltageCurrentEvent, result): + def get_row( + event: analyses.VoltageEvent | analyses.VoltageCurrentEvent, + result: analyses.VoltageResult | analyses.VoltageCurrentResult, + ) -> list[Any]: return [ event.source, int(event.signals.timestamp), @@ -60,13 +65,8 @@ def get_voltage_output(event: analyses.VoltageEvent, result: analyses.VoltageRes datetime_string = Time.Time.to_string_short(event.signals.timestamp) reference_datetime_string = Time.Time.to_string_short(event.reference_signals.timestamp) - if not event.signals.u_hds or not event.reference_signals.u_hds: - output_list.append( - output_types.HTMLOutput(f"No Quench Heater Discharges in {event.source} at {datetime_string}") - ) - return output_list - def plot_figure(title, log_scale=False): + def plot_figure(title: str, log_scale: bool = False) -> plt.Figure: fig, ax = plt.subplots(1, 1, figsize=(15, 7)) ax.set_title(title) @@ -101,21 +101,20 @@ def get_voltage_output(event: analyses.VoltageEvent, result: analyses.VoltageRes first_last_u_comp = result.first_last_u_comp tau_u_comp = result.tau_u_comp - if first_last_u_comp is not None and tau_u_comp is not None: - html = ( - first_last_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the initial and final voltage") - .apply(lambda s: first_last_u_comp["result"].map(row_color_dct)) - .format(precision=1) - .to_html() - + tau_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the discharge characteristic time to the reference") - .apply(lambda s: tau_u_comp["result"].map(row_color_dct)) - .format(precision=3) - .to_html() - ) + html = ( + first_last_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the initial and final voltage") + .apply(lambda s: first_last_u_comp["result"].map(row_color_dct)) + .format(precision=1) + .to_html() + + tau_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the discharge characteristic time to the reference") + .apply(lambda s: tau_u_comp["result"].map(row_color_dct)) + .format(precision=3) + .to_html() + ) - output_list.append(output_types.HTMLOutput(html)) + output_list.append(output_types.HTMLOutput(html)) text = ( f"The QH Discharges are labeled: {bool(result)}." @@ -134,22 +133,12 @@ def get_voltage_current_output( datetime_string = Time.Time.to_string_short(event.signals.timestamp) reference_datetime_string = Time.Time.to_string_short(event.reference_signals.timestamp) - if ( - not event.signals.i_hds - or not event.reference_signals.i_hds - or not event.signals.u_hds - or not event.reference_signals.u_hds - ): - output_list.append( - output_types.HTMLOutput(f"No Quench Heater Discharges in {event.source} at {datetime_string}") - ) - return output_list title = f"Magnet: {event.source}, Time Stamp: {datetime_string}, U_HDS(t), I_HDS(t), R_HDS(t)" fig, ax = plt.subplots(1, 3, figsize=(20, 7)) - for u_hds, u_hds_ref in zip(event.signals.u_hds, event.reference_signals.u_hds): + for u_hds, u_hds_ref in zip(event.signals.u_hds_sync_dfs, event.reference_signals.u_hds_sync_dfs): u_hds.plot(ax=ax[0]) u_hds_ref.plot(ax=ax[0], style="--") @@ -158,7 +147,7 @@ def get_voltage_current_output( ax[0].set_xlabel("time, [s]", fontsize=FONT_SIZE) ax[0].set_ylabel("U_HDS, [V]", fontsize=FONT_SIZE) - for i_hds, i_hds_ref in zip(event.signals.i_hds, event.reference_signals.i_hds): + for i_hds, i_hds_ref in zip(event.signals.i_hds_sync_dfs, event.reference_signals.i_hds_sync_dfs): i_hds.plot(ax=ax[1], title=title) i_hds_ref.plot(ax=ax[1], style="--") @@ -184,36 +173,30 @@ def get_voltage_current_output( first_r_comp = result.first_r_comp tau_u_comp = result.tau_u_comp capacitance_comp = result.capacitance_comp - if ( - first_last_u_comp is not None - and first_r_comp is not None - and tau_u_comp is not None - and capacitance_comp is not None - ): - html = ( - first_last_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the initial and final voltage") - .apply(lambda s: first_last_u_comp["result"].map(row_color_dct)) - .format(precision=1) - .to_html() - + first_r_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the initial resistance to the reference") - .apply(lambda s: first_r_comp["result"].map(row_color_dct)) - .format(precision=2) - .to_html() - + tau_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the discharge characteristic time to the reference") - .apply(lambda s: tau_u_comp["result"].map(row_color_dct)) - .format(precision=3) - .to_html() - + capacitance_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") - .set_caption("Comparison of the estimated capacitance to the reference") - .apply(lambda s: capacitance_comp["result"].map(row_color_dct)) - .format(precision=3) - .to_html() - ) + html = ( + first_last_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the initial and final voltage") + .apply(lambda s: first_last_u_comp["result"].map(row_color_dct)) + .format(precision=1) + .to_html() + + first_r_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the initial resistance to the reference") + .apply(lambda s: first_r_comp["result"].map(row_color_dct)) + .format(precision=2) + .to_html() + + tau_u_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the discharge characteristic time to the reference") + .apply(lambda s: tau_u_comp["result"].map(row_color_dct)) + .format(precision=3) + .to_html() + + capacitance_comp.style.set_table_attributes("style='display:inline;vertical-align:top'") + .set_caption("Comparison of the estimated capacitance to the reference") + .apply(lambda s: capacitance_comp["result"].map(row_color_dct)) + .format(precision=3) + .to_html() + ) - output_list.append(output_types.HTMLOutput(html)) + output_list.append(output_types.HTMLOutput(html)) text = ( f"The QH Discharges are labeled: {bool(result)}." @@ -229,14 +212,7 @@ def get_output_wrapper( get_analysis_result: Callable[[], bool], events: Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], results: Sequence[analyses.VoltageResult | analyses.VoltageCurrentResult], - add_outputs: Callable[ - [ - list[output_types.Output], - Sequence[analyses.VoltageEvent | analyses.VoltageCurrentEvent], - Sequence[analyses.VoltageResult | analyses.VoltageCurrentResult], - ], - None, - ], + add_outputs: Callable[[list[output_types.Output], Sequence[Any], Sequence[Any]], None], ) -> list[output_types.Output]: outputs: list[output_types.Output] = [] @@ -254,7 +230,3 @@ def get_output_wrapper( outputs.append(output_types.HTMLOutput(f"Overall signature = <h2>{get_analysis_result()}</h2>")) return outputs - - -def map_two_arg_func(two_arg_func, first, second): - return [single for f, s in zip(first, second) for single in two_arg_func(f, s)] diff --git a/lhcsmqh/output/quench_heater_ccc_output.py b/lhcsmqh/output/quench_heater_ccc_output.py index 4013eab..89f0acb 100644 --- a/lhcsmqh/output/quench_heater_ccc_output.py +++ b/lhcsmqh/output/quench_heater_ccc_output.py @@ -25,12 +25,13 @@ def get_summary(analysis: QHCCCAnalysis) -> list[output_types.Output]: Returns: A list of Output instances with only one HTMLOutput.""" - events: list[commons.VoltageEvent | commons.VoltageCurrentEvent] = ( - analysis.voltage_events + analysis.voltage_current_events - ) - results: list[commons.VoltageResult | commons.VoltageCurrentResult] = ( - analysis.voltage_results + analysis.voltage_current_results - ) + events: list[commons.VoltageEvent | commons.VoltageCurrentEvent] = [] + events.extend(analysis.voltage_events) + events.extend(analysis.voltage_current_events) + + results: list[commons.VoltageResult | commons.VoltageCurrentResult] = [] + results.extend(analysis.voltage_results) + results.extend(analysis.voltage_current_results) return output_common.get_summary_output(events, results) @@ -45,7 +46,7 @@ def get_output(analysis: QHCCCAnalysis) -> list[output_types.Output]: output_list: list[output_types.Output] = [] - if analysis.initial_charge_check and analysis.low_charge_events is not None: + if analysis.initial_charge_check and not analysis.low_charge_events.empty: output_list.append(output_types.TextOutput("Low level charge events:")) output_list.append(output_types.HTMLOutput(analysis.low_charge_events.to_html())) @@ -58,14 +59,13 @@ def get_output(analysis: QHCCCAnalysis) -> list[output_types.Output]: ) return output_list - output_list.extend( - output_common.map_two_arg_func( - output_common.get_voltage_output, analysis.voltage_events, analysis.voltage_results - ) - + output_common.map_two_arg_func( - output_common.get_voltage_current_output, analysis.voltage_current_events, analysis.voltage_current_results - ) - ) + for voltage_event, voltage_result in zip(analysis.voltage_events, analysis.voltage_results): + output_list.extend(output_common.get_voltage_output(voltage_event, voltage_result)) + + for voltage_current_event, voltage_current_result in zip( + analysis.voltage_current_events, analysis.voltage_current_results + ): + output_list.extend(output_common.get_voltage_current_output(voltage_current_event, voltage_current_result)) signature = ( '<span style="background-color: #3DCF1A">    Passed' diff --git a/lhcsmqh/output/quench_heater_voltage_current_output.py b/lhcsmqh/output/quench_heater_voltage_current_output.py index 95db764..a95c17c 100644 --- a/lhcsmqh/output/quench_heater_voltage_current_output.py +++ b/lhcsmqh/output/quench_heater_voltage_current_output.py @@ -1,8 +1,10 @@ """Module containing functions presenting the results of the QHDA for RB/RQ circuits.""" +from collections.abc import Sequence + from lhcsmapi.api.analysis.output import output_types -from lhcsmqh.analyses import QuenchHeaterVoltageCurrentAnalysis +from lhcsmqh.analyses import QuenchHeaterVoltageCurrentAnalysis, commons from . import _common as output_common @@ -37,7 +39,11 @@ def get_output(analysis: QuenchHeaterVoltageCurrentAnalysis) -> list[output_type Returns: A list of Output instances.""" - def add_outputs(outputs, events, results): + def add_outputs( + outputs: list[output_types.Output], + events: Sequence[commons.VoltageCurrentEvent], + results: Sequence[commons.VoltageCurrentResult], + ) -> None: for event, result in zip(events, results): outputs.append( output_types.HTMLOutput( diff --git a/lhcsmqh/output/quench_heater_voltage_output.py b/lhcsmqh/output/quench_heater_voltage_output.py index 05e6335..9b7c38c 100644 --- a/lhcsmqh/output/quench_heater_voltage_output.py +++ b/lhcsmqh/output/quench_heater_voltage_output.py @@ -1,8 +1,12 @@ """Module containing functions presenting the results of the QHDA for IPQ/IPD/IT circuits.""" +from __future__ import annotations + +from collections.abc import Sequence + from lhcsmapi.api.analysis.output import output_types -from lhcsmqh.analyses import QuenchHeaterVoltageAnalysis +from lhcsmqh.analyses import QuenchHeaterVoltageAnalysis, commons from . import _common as output_common @@ -35,7 +39,11 @@ def get_output(analysis: QuenchHeaterVoltageAnalysis) -> list[output_types.Outpu Returns: A list of Output instances.""" - def add_outputs_with_links(outputs, events, results): + def add_outputs_with_links( + outputs: list[output_types.Output], + events: Sequence[commons.VoltageEvent], + results: Sequence[commons.VoltageResult], + ) -> None: for event, result in zip(events, results): outputs.append( output_types.HTMLOutput( diff --git a/pyproject.toml b/pyproject.toml index a4a66f5..0b6aa25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,8 +59,7 @@ select = [ warn_unused_configs = true warn_redundant_casts = true warn_no_return = true -# TODO: Uncomment the following line when refactoring is done -# check_untyped_defs = true +disallow_untyped_defs = true [tool.pytest.ini_options] addopts = ["--import-mode=importlib"] diff --git a/test/unit/test_output.py b/test/unit/test_output.py index fc9b65d..cd204cc 100644 --- a/test/unit/test_output.py +++ b/test/unit/test_output.py @@ -109,34 +109,22 @@ voltage_current_events = [ source="C30R7", circuit_type=GenericCircuitType.RB, circuit_name="RB.A78", - signals=commons.VoltageCurrentSignals( - timestamp=1658393366712000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), - reference_signals=commons.VoltageCurrentSignals( - timestamp=1658393366712000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), + signals=MagicMock(timestamp=1658393366712000000), + reference_signals=MagicMock(timestamp=1658393366712000000), ), commons.VoltageCurrentEvent( source="A30R7", circuit_type=GenericCircuitType.RB, circuit_name="RB.A78", - signals=commons.VoltageCurrentSignals( - timestamp=1658393366713000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), - reference_signals=commons.VoltageCurrentSignals( - timestamp=1658393366713000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), + signals=MagicMock(timestamp=1658393366713000000), + reference_signals=MagicMock(timestamp=1658393366713000000), ), commons.VoltageCurrentEvent( source="B31R7", circuit_type=GenericCircuitType.RB, circuit_name="RQD10.L8", - signals=commons.VoltageCurrentSignals( - timestamp=1658393366713000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), - reference_signals=commons.VoltageCurrentSignals( - timestamp=1658393366713000000, u_hds=MagicMock(), i_hds=MagicMock(), r_hds=MagicMock() - ), + signals=MagicMock(timestamp=1658393366713000000), + reference_signals=MagicMock(timestamp=1658393366713000000), ), ] @@ -667,11 +655,7 @@ def test_qh_voltage_current_output(): ( [ commons.VoltageCurrentEvent( - "C30R7", - GenericCircuitType.RB, - "RB.A78", - commons.VoltageCurrentSignals(0, [], [], []), - commons.VoltageCurrentSignals(0, [], [], []), + "C30R7", GenericCircuitType.RB, "RB.A78", MagicMock(timestamp=0), MagicMock(timestamp=0) ) ], [ -- GitLab