diff --git a/integration-tests.sh b/integration-tests.sh index 18c27879afbaa1b7d9cc3e09a2effa7a3dc23a89..c57d6a9f7ce89a5c357bb11208a89502de0df2df 100755 --- a/integration-tests.sh +++ b/integration-tests.sh @@ -112,7 +112,7 @@ if [ "$INSTALL" -eq 1 ] && [ "$PYTHON_TESTS" -eq 1 ]; then fi # needed, because otherwise ansible complains and won't use ansible.cfg from world-writable dir -if [ "$CI_ENV" ]; then +if [ "${CI_ENV:-}" ]; then chmod 744 ansible fi diff --git a/integration-tests/src/python-integration-test/python/nxcals/integrationtests/pytimber/test_pytimber.py b/integration-tests/src/python-integration-test/python/nxcals/integrationtests/pytimber/test_pytimber.py index 35f4d59e7d772b5d21ea5728d83c05df3a4f884f..960df39b4f8a1480891d1fb5feb30053eedfe2d0 100644 --- a/integration-tests/src/python-integration-test/python/nxcals/integrationtests/pytimber/test_pytimber.py +++ b/integration-tests/src/python-integration-test/python/nxcals/integrationtests/pytimber/test_pytimber.py @@ -1,7 +1,7 @@ import math import logging -from datetime import datetime, timezone, timedelta -from typing import Any, Iterable, Optional, Dict, List, Union +from datetime import datetime, timezone +from typing import Any, Iterable, Optional, Dict, List import numpy as np import pytest @@ -40,7 +40,6 @@ from pytimber import ( Fundamentals, LoggingDB, SparkLoggingDB, - AGGREGATION_TIMESTAMP_FIELD, AGGREGATION_VALUE_FIELD, ) from pytimber.stats import ( @@ -258,7 +257,7 @@ class TestAligned: master_data = ldb.getVariable(master_var_name, START_TIME, END_TIME) assert result[TIMESTAMPS_KEY].tolist() == master_data[0].tolist() - def test_get_aligned_vectors(self, ldb: LoggingDB) -> None: + def test_get_aligned_vectors_aligned_to_scalar(self, ldb: LoggingDB) -> None: master_var_name = NUMERIC_VARIABLE_NAME pattern_or_list = [master_var_name, DOUBLE_VECTOR_NUMERIC_VARIABLE_NAME] @@ -267,6 +266,15 @@ class TestAligned: assert np.array_equal(result[master_var_name], master_data_values) + def test_get_aligned_vectors_aligned_to_vector(self, ldb: LoggingDB) -> None: + master_var_name = INT_VECTOR_NUMERIC_VARIABLE_NAME + pattern_or_list = [master_var_name, DOUBLE_VECTOR_NUMERIC_VARIABLE_NAME] + + result = ldb.getAligned(pattern_or_list, START_TIME, END_TIME) + (_, master_data_values) = ldb.getVariable(master_var_name, START_TIME, END_TIME) + + assert np.array_equal(result[master_var_name], master_data_values) + @pytest.mark.skip(reason="no way of currently testing this") def test_on_real_data(self, ldb: LoggingDB) -> None: t1 = "2023-08-16 00:00:00.000" diff --git a/python/pytimber/pytimber/data.py b/python/pytimber/pytimber/data.py index ec9c730e1495905ffcfc297c417b985e4788c7f3..9da77b6fada93c466ee84febf065941c9d8c2d7c 100644 --- a/python/pytimber/pytimber/data.py +++ b/python/pytimber/pytimber/data.py @@ -13,7 +13,7 @@ import pandas as pd from numpy import ndarray from nxcals.api.extraction.data.builders import DataQuery from pandas.core.series import Series -from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import DataFrame, SparkSession, Column from .fundamentals import FundamentalsManager, FundamentalsType from .utils import ( @@ -185,51 +185,62 @@ class NumpyDataConverter(DataConverter[VariableQueryResultType]): def convert_from_get_aligned( self, - dataset_dict: VariableDataFrameResultType, + dataset_dict: Dict[str, VariableDataFrameResultType], master_name: str, variables: List[Any], - ) -> Dict[str, VariableQueryResultType]: + ) -> Dict[str, ValueArray[Any, Any]]: if not dataset_dict or dataset_dict[TIMESTAMPS_KEY].isEmpty(): return {} pandas_df_timestamps = dataset_dict[TIMESTAMPS_KEY].select(TIMESTAMP).toPandas() ret = { - TIMESTAMPS_KEY: DataManager._timestamps_to_datetime_np_array( + TIMESTAMPS_KEY: NumpyDataConverter._timestamps_to_datetime_np_array( pandas_df_timestamps[TIMESTAMP], True ), - master_name: dataset_dict[master_name] - .select( - f"{VALUE}.elements", - f"{VALUE}.dimensions", - ) - .toPandas() - .toNumpy() - if is_value_array(dataset_dict[master_name].schema, VALUE) - else dataset_dict[master_name].select(VALUE).toPandas()[VALUE].to_numpy(), + master_name: NumpyDataConverter._convert_values_from_spark( + dataset_dict[master_name], VALUE + ), } for var in variables: if master_name != var.getVariableName(): + variable_ds = dataset_dict[var.getVariableName()] ret[var.getVariableName()] = ( - self._filter_by_variable(dataset_dict, var) - .select( - f"{AGGREGATION_VALUE_FIELD}.elements", - f"{AGGREGATION_VALUE_FIELD}.dimensions", + NumpyDataConverter._convert_values_from_spark( + variable_ds, AGGREGATION_VALUE_FIELD ) - .toPandas() - .to_numpy() - if is_value_array( - self._filter_by_variable(dataset_dict, var).schema, - AGGREGATION_VALUE_FIELD, - ) - else self._filter_by_variable(dataset_dict, var) - .select(AGGREGATION_VALUE_FIELD) - .toPandas()[AGGREGATION_VALUE_FIELD] - .to_numpy() ) return ret + @staticmethod + def _convert_values_from_spark( + ds: DataFrame, column_name: str + ) -> ValueArray[Any, Any]: + if is_value_array(ds.schema, column_name): + return NumpyDataConverter._convert_array_from_spark(ds, column_name) + else: + return NumpyDataConverter._convert_scalars_from_spark(ds, column_name) + + @staticmethod + def _convert_scalars_from_spark( + ds: DataFrame, column_name: str + ) -> ValueArray[Any, Any]: + return ds.select(column_name).toPandas()[column_name].to_numpy() + + @staticmethod + def _convert_array_from_spark( + ds: DataFrame, column_name: str + ) -> ValueArray[Any, Any]: + return NumpyDataConverter._extract_array_values( + ds.select( + f"{column_name}.elements", + f"{column_name}.dimensions", + ) + .toPandas() + .to_numpy() + ) + def convert_from_get_scaled( self, dataset_dict: Dict[str, VariableDataFrameResultType], unix_time: bool ) -> Dict[str, VariableQueryResultType]: @@ -239,7 +250,7 @@ class NumpyDataConverter(DataConverter[VariableQueryResultType]): ts_val = dataset_dict[var_name].toPandas() ret[var_name] = ( - DataManager._timestamps_to_datetime_np_array( + NumpyDataConverter._timestamps_to_datetime_np_array( ts_val.timestamp, unix_time ), ts_val.value.to_numpy(), @@ -283,24 +294,62 @@ class NumpyDataConverter(DataConverter[VariableQueryResultType]): if not variable_ds.empty: if is_array: - values = DataManager._extract_array_values( + values = NumpyDataConverter._extract_array_values( variable_ds[["elements", "dimensions"]].to_numpy() ) else: values = variable_ds[VALUE].to_numpy() result[variable_name] = ( - DataManager._timestamps_to_datetime_np_array( + NumpyDataConverter._timestamps_to_datetime_np_array( variable_ds[TIMESTAMP], unix_time ), values, ) return result - def _filter_by_variable( - self, data_dict: VariableDataFrameResultType, variable: Any - ) -> VariableDataFrameResultType: - return data_dict[variable.getVariableName()] + @staticmethod + def _timestamps_to_datetime_np_array( + raw_timestamps: Series, unix_time: bool + ) -> ValueArray[Any, Any]: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", FutureWarning) + datetimes_as_np_array = np.array( + pd.to_datetime(raw_timestamps, unit="ns", utc=True).dt.to_pydatetime() + ) + + if unix_time: + timestamps = np.array([x.timestamp() for x in datetimes_as_np_array]) + else: + timestamps = datetimes_as_np_array + return timestamps + + @staticmethod + def _extract_array_values( + raw_values: ndarray[Any, Any], + ) -> Union[ndarray[Any, Any], List[ndarray[Any, Any]]]: + if len(raw_values) == 0: + return [] + + dims = len(raw_values[0][1]) + + if dims == 1: + return raw_values[:, 0] + elif dims == 2: + return [ + NumpyDataConverter._extract_matrix_value(value) for value in raw_values + ] + + raise ValueError( + f"Max 2D arrays are supported, obtained {dims}D array", + ) + + @staticmethod + def _extract_matrix_value(value: ndarray[Any, Any]) -> ndarray[Any, Any]: + data = value[0] + rows, cols = value[1] + + return np.reshape(data, [rows, cols]) class DataManager: @@ -351,16 +400,12 @@ class DataManager: def _get_aligned_as_dataset( self, - pattern: PatternOrList, + master_name: str, + variables: List[Any], t1: PyTimberTimeType, t2: PyTimberTimeType, fundamental: Optional[FundamentalsType] = None, - master: Optional[str] = None, ) -> Dict[str, VariableDataFrameResultType]: - master_name, variables = self._get_master_and_variables_for( - pattern, - master, - ) master_ds = self._get_dataset_for( master_name, t1, @@ -432,22 +477,6 @@ class DataManager: # ############################### private stuff ############################################# - @staticmethod - def _timestamps_to_datetime_np_array( - raw_timestamps: Series, unix_time: bool - ) -> ndarray[Any, Any]: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", FutureWarning) - datetimes_as_np_array = np.array( - pd.to_datetime(raw_timestamps, unit="ns", utc=True).dt.to_pydatetime() - ) - - if unix_time: - timestamps = np.array([x.timestamp() for x in datetimes_as_np_array]) - else: - timestamps = datetimes_as_np_array - return timestamps - def _get_dataset_for( self, pattern: PatternOrList, @@ -485,31 +514,6 @@ class DataManager: return ds - @staticmethod - def _extract_array_values( - raw_values: ndarray[Any, Any], - ) -> Union[ndarray[Any, Any], List[ndarray[Any, Any]]]: - if len(raw_values) == 0: - return [] - - dims = len(raw_values[0][1]) - - if dims == 1: - return raw_values[:, 0] - elif dims == 2: - return [DataManager._extract_matrix_value(value) for value in raw_values] - - raise ValueError( - f"Max 2D arrays are supported, obtained {dims}D array", - ) - - @staticmethod - def _extract_matrix_value(value: ndarray[Any, Any]) -> ndarray[Any, Any]: - data = value[0] - rows, cols = value[1] - - return np.reshape(data, [rows, cols]) - def _is_valid_time_window( self, fundamental: Optional[FundamentalsType] = None, diff --git a/python/pytimber/pytimber/pytimber.py b/python/pytimber/pytimber/pytimber.py index e7b267839a49fb7b8eaece03445db080172a6839..69dede2b7f60ceaf2d9270bd677f24bcc2691c64 100644 --- a/python/pytimber/pytimber/pytimber.py +++ b/python/pytimber/pytimber/pytimber.py @@ -721,7 +721,7 @@ class AbstractLoggingDB(Generic[T]): return self.data_converter.convert_from_get_aligned( self._data_manager._get_aligned_as_dataset( - pattern, t1, t2, fundamental, master + master_name, variables, t1, t2, fundamental ), master_name, variables, diff --git a/python/pytimber/pytimber/tests/test_data.py b/python/pytimber/pytimber/tests/test_data.py index 05ca706760d9cc5ad8b2e1aa855c66986c0a7c1b..3ab4ab823fa4d33dfba00aac6cdd5187569831ce 100644 --- a/python/pytimber/pytimber/tests/test_data.py +++ b/python/pytimber/pytimber/tests/test_data.py @@ -2,7 +2,7 @@ from typing import Any import pytest import numpy as np -from pytimber.data import DataManager +from pytimber.data import NumpyDataConverter from numpy import ndarray @@ -17,7 +17,7 @@ class TestData: def test_extract_matrix_value( self, value: ndarray[Any, Any], expected_output: ndarray[Any, Any] ) -> None: - result = DataManager._extract_matrix_value(value) + result = NumpyDataConverter._extract_matrix_value(value) assert np.array_equal(result, expected_output) @pytest.mark.parametrize( @@ -33,7 +33,7 @@ class TestData: is_value_array: bool, expected_output: ndarray[Any, Any], ) -> None: - result = DataManager._extract_array_values(raw_values) + result = NumpyDataConverter._extract_array_values(raw_values) assert np.array_equal(result, expected_output) @pytest.mark.parametrize( @@ -81,7 +81,7 @@ class TestData: is_value_array: bool, expected_output: ndarray[Any, Any], ) -> None: - result = DataManager._extract_array_values(raw_values) + result = NumpyDataConverter._extract_array_values(raw_values) import numpy.testing as npt