diff --git a/cern-metadata-api/src/test/java/cern/nxcals/api/custom/extraction/metadata/SnapshotServiceImplTest.java b/cern-metadata-api/src/test/java/cern/nxcals/api/custom/extraction/metadata/SnapshotServiceImplTest.java index 319006a3e7bd2647556247be61cbdab2b28fedd5..240bc0a73a75c847756953e397bc05ae0ef23a70 100644 --- a/cern-metadata-api/src/test/java/cern/nxcals/api/custom/extraction/metadata/SnapshotServiceImplTest.java +++ b/cern-metadata-api/src/test/java/cern/nxcals/api/custom/extraction/metadata/SnapshotServiceImplTest.java @@ -167,7 +167,6 @@ class SnapshotServiceImplTest { } - @Test void getVariables() { //given diff --git a/common/src/main/java/cern/nxcals/api/custom/domain/Snapshot.java b/common/src/main/java/cern/nxcals/api/custom/domain/Snapshot.java index f67eebd145ccd89f52ca58f92e8eeb3994416521..9c47eca188537a1487b0dd5fcb55fb4d29677306 100644 --- a/common/src/main/java/cern/nxcals/api/custom/domain/Snapshot.java +++ b/common/src/main/java/cern/nxcals/api/custom/domain/Snapshot.java @@ -21,7 +21,6 @@ import lombok.NonNull; import lombok.ToString; import lombok.Value; -import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -272,6 +271,7 @@ public class Snapshot extends BaseGroup { static TimeDefinition dynamic(int duration, LoggingTimeZone timezone, PriorTime priorTime, DynamicTimeUnit timeUnit) { + return new DynamicTimeDefinition(timeUnit, timezone, priorTime, duration); } } @@ -354,7 +354,33 @@ public class Snapshot extends BaseGroup { LocalDateTime refTime = LocalDateTime.now(zoneId); LocalDateTime t2 = priorTime.withRespectTo(refTime); - LocalDateTime t1 = t2.minus(Duration.of(duration, ChronoUnit.valueOf(timeUnit.name()))); + LocalDateTime t1; + + switch (ChronoUnit.valueOf(timeUnit.name())) { + case SECONDS: + t1 = t2.minusSeconds(duration); + break; + case MINUTES: + t1 = t2.minusMinutes(duration); + break; + case HOURS: + t1 = t2.minusHours(duration); + break; + case DAYS: + t1 = t2.minusDays(duration); + break; + case WEEKS: + t1 = t2.minusWeeks(duration); + break; + case MONTHS: + t1 = t2.minusMonths(duration); + break; + case YEARS: + t1 = t2.minusYears(duration); + break; + default: + throw new IllegalArgumentException("Unsupported time unit: " + timeUnit); + } return TimeWindow.between(TimeUtils.getNanosFromLocal(t1, zoneId), TimeUtils.getNanosFromLocal(t2, zoneId)); } diff --git a/common/src/test/java/cern/nxcals/api/custom/domain/SnapshotTest.java b/common/src/test/java/cern/nxcals/api/custom/domain/SnapshotTest.java index 5d7f8dbea92a7666cc0444a3617eb415e69036e3..84717d9f80c49d6fdd41e45945eb964c73c5410e 100644 --- a/common/src/test/java/cern/nxcals/api/custom/domain/SnapshotTest.java +++ b/common/src/test/java/cern/nxcals/api/custom/domain/SnapshotTest.java @@ -14,12 +14,12 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.MockedStatic; import org.mockito.Mockito; -import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.TimeZone; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -49,16 +49,23 @@ class SnapshotTest { private static final int FILL_NUMBER = 8775; static Stream<Arguments> dataForDynamicTimeWindowTest() { + + return Stream.of( + Arguments.of(LoggingTimeZone.UTC, DynamicTimeUnit.SECONDS, 2, ChronoUnit.SECONDS), + Arguments.of(LoggingTimeZone.LOCAL, DynamicTimeUnit.MINUTES, 3, ChronoUnit.MINUTES), + Arguments.of(LoggingTimeZone.UTC, DynamicTimeUnit.HOURS, 5, ChronoUnit.HOURS), Arguments.of(LoggingTimeZone.LOCAL, DynamicTimeUnit.DAYS, 7, ChronoUnit.DAYS), - Arguments.of(LoggingTimeZone.UTC, DynamicTimeUnit.HOURS, 5, ChronoUnit.HOURS) + Arguments.of(LoggingTimeZone.UTC, DynamicTimeUnit.WEEKS, 8, ChronoUnit.WEEKS), + Arguments.of(LoggingTimeZone.LOCAL, DynamicTimeUnit.MONTHS, 9, ChronoUnit.MONTHS), + Arguments.of(LoggingTimeZone.UTC, DynamicTimeUnit.YEARS, 1, ChronoUnit.YEARS) ); } @ParameterizedTest @MethodSource("dataForDynamicTimeWindowTest") void shouldCalculateDynamicTimeWindow(LoggingTimeZone loggingTimeZone, DynamicTimeUnit dynamicTimeUnit, - Integer dynamicDuration, ChronoUnit resultUnit) { + Integer dynamicDuration, ChronoUnit resultUnit) { withMockedLocalDateTimeNow(loggingTimeZone.getTimeZone().toZoneId(), () -> { Snapshot.TimeDefinition timeDefinition = Snapshot.TimeDefinition.dynamic( dynamicDuration, loggingTimeZone, PriorTime.from("Start of day"), dynamicTimeUnit @@ -76,10 +83,14 @@ class SnapshotTest { Instant startOfDay = LocalDateTime.now(loggingTimeZone.getTimeZone().toZoneId()) .truncatedTo(ChronoUnit.DAYS).atZone(loggingTimeZone.getTimeZone().toZoneId()).toInstant(); - assertEquals(startOfDay, timeWindow.getEndTime()); - assertEquals(Duration.between(timeWindow.getStartTime(), timeWindow.getEndTime()), - Duration.of(dynamicDuration, resultUnit)); + + ZoneId zoneId = ZoneId.of("UTC"); + if (loggingTimeZone == LoggingTimeZone.LOCAL) { + zoneId = TimeZone.getDefault().toZoneId(); + } + assertEquals(timeWindow.getStartTime().atZone(zoneId), + timeWindow.getEndTime().atZone(zoneId).minus(dynamicDuration, resultUnit)); }); } @@ -92,7 +103,7 @@ class SnapshotTest { private void withMockedLocalDateTimeNow(ZoneId zoneId, Runnable runnable) { LocalDateTime now = LocalDateTime.now(); ZonedDateTime zoned = now.atZone(zoneId); - try (MockedStatic<LocalDateTime> localDateTimeMockedStatic = Mockito.mockStatic(LocalDateTime.class)) { + try (MockedStatic<LocalDateTime> localDateTimeMockedStatic = Mockito.mockStatic(LocalDateTime.class, Mockito.CALLS_REAL_METHODS)) { localDateTimeMockedStatic.when(LocalDateTime::now).thenReturn(now); localDateTimeMockedStatic.when(() -> LocalDateTime.now(zoneId)).thenReturn(zoned.toLocalDateTime()); runnable.run(); diff --git a/python/pytimber/pytimber/snapshots.py b/python/pytimber/pytimber/snapshots.py index d82404b3e025a59bcef32dd2eb74c5e39ee9d144..774808c2ef52e455e7a36edbbb8e16665a87ad77 100644 --- a/python/pytimber/pytimber/snapshots.py +++ b/python/pytimber/pytimber/snapshots.py @@ -2,7 +2,6 @@ import json from datetime import datetime, timezone from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Tuple -from dateutil.relativedelta import relativedelta from pyspark.sql import SparkSession from pytimber import utils from pytimber.utils import ( @@ -26,35 +25,27 @@ class _SnapshotSearchCriteria(NamedTuple): class SnapshotManager: def __init__(self, spark: SparkSession): self.spark = spark - self._nxcals_metadata_api = self.spark._jvm.cern.nxcals.api.extraction.metadata - self._nxcals_custom_api = self.spark._jvm.cern.nxcals.api.custom - self._group_service = ( - self._nxcals_metadata_api.ServiceClientFactory.createGroupService() + self._nxcals_custom_metadata_api = ( + self.spark._jvm.cern.nxcals.api.custom.extraction.metadata ) + self._nxcals_custom_api = self.spark._jvm.cern.nxcals.api.custom + self._snapshot_service = self._nxcals_custom_metadata_api.SnapshotServiceFactory.createSnapshotService() + self._time_definition = self._nxcals_custom_api.domain.Snapshot.TimeDefinition + def get_snapshots( self, pattern_or_list: PatternOrList, owner_pattern: Optional[str] = None, description_pattern: Optional[str] = None, ) -> List[Any]: - groups = self._nxcals_metadata_api.queries.Groups - group_type = self._nxcals_custom_api.domain.GroupType + snapshots = self._nxcals_custom_metadata_api.queries.Snapshots if isinstance(pattern_or_list, str): - snap_condition = ( - getattr( - groups.suchThat().label().eq(str(group_type.SNAPSHOT)), - "and", - )() - .name() - .like(pattern_or_list) - ) + snap_condition = snapshots.suchThat().name().like(pattern_or_list) elif isinstance(pattern_or_list, Iterable): snap_condition = getattr( - getattr( - groups.suchThat().label().eq(str(group_type.SNAPSHOT)), "and" - )().name(), + snapshots.suchThat().name(), "in", )(pattern_or_list) else: @@ -80,12 +71,12 @@ class SnapshotManager: .like(description_pattern) ) - return list(self._group_service.findAll(snap_condition)) + return list(self._snapshot_service.findAll(snap_condition)) def get_variables(self, snapshot: Any) -> Any: group_property_name = self._nxcals_custom_api.domain.GroupPropertyName - return self._group_service.getVariables( + return self._snapshot_service.getVariables( snapshot.getId(), )[group_property_name.getSelectedVariables.toString()] @@ -117,34 +108,59 @@ class SnapshotManager: # Note: accessing kwargs from this function is not encouraged in order to keep the signature readable. if fillNumber is not None: - t1, t2 = SnapshotManager._get_range_based_on_fills( - beamModeStart, beamModeEnd + _beam_mode_data = self._nxcals_custom_api.domain.constants.BeamModeData + time_definition = self._time_definition.forFill( + int(fillNumber), + _beam_mode_data.fromJson(beamModeStart), + _beam_mode_data.fromJson(beamModeEnd), ) + elif isEndTimeDynamic is not None: if isEndTimeDynamic == "true": if getDynamicDuration is None: raise ValueError("getDynamicDuration cannot be None") - now = ( - datetime.now(timezone.utc) - if getTimeZone == "UTC_TIME" - else datetime.now() + if getPriorTime is None: + raise ValueError("getPriorTime cannot be None") + + if getTime is None: + raise ValueError("getTime cannot be None") + + time_zone = ( + self._nxcals_custom_api.domain.constants.LoggingTimeZone.valueOf( + getTimeZone + ) + ) + + prior_time = getattr( + self._nxcals_custom_api.domain.snapshot.PriorTime, "from" + )(getPriorTime) + + time_unit = ( + self._nxcals_custom_api.domain.constants.DynamicTimeUnit.valueOf( + getTime + ) ) - t1, t2 = SnapshotManager._get_dynamic_range( - now, - getPriorTime, - getTime, - int(getDynamicDuration), + time_definition = self._time_definition.dynamic( + int(getDynamicDuration), time_zone, prior_time, time_unit ) else: # Get fixed range - t1 = utils.get_nanos_from( - SnapshotManager._to_datetime(getTimeZone, getStartTime) + t1 = utils.get_instant_from( + self.spark, + utils.get_nanos_from( + SnapshotManager._to_datetime(getTimeZone, getStartTime) + ), ) - t2 = utils.get_nanos_from( - SnapshotManager._to_datetime(getTimeZone, getEndTime) + t2 = utils.get_instant_from( + self.spark, + utils.get_nanos_from( + SnapshotManager._to_datetime(getTimeZone, getEndTime) + ), ) + + time_definition = self._time_definition.fixedDates(t1, t2) else: raise ValueError( "Neither 'fillNumber' nor 'isEndTimeDynamic' is present in the snapshot attributes" @@ -160,8 +176,11 @@ class SnapshotManager: fund_filter["timingUser"], ], ) - - return _SnapshotSearchCriteria(fundamental_pattern, t1, t2) + return _SnapshotSearchCriteria( + fundamental_pattern, + time_definition.getTimeWindow().getStartTimeNanos(), + time_definition.getTimeWindow().getEndTimeNanos(), + ) def get_snapshot_names( self, @@ -199,61 +218,6 @@ class SnapshotManager: return result - @staticmethod - def _get_range_based_on_fills( - beam_mode_start: Optional[str], beam_mode_end: Optional[str] - ) -> Tuple[int, int]: - if beam_mode_start is None: - raise ValueError("beamModeStart cannot be None") - if beam_mode_end is None: - raise ValueError("beamModeEnd cannot be None") - - t1 = utils.get_nanos_from( - SnapshotManager._adjust_timestamp_format( - json.loads(beam_mode_start)["validity"]["startTime"] - ), - ) - t2 = utils.get_nanos_from( - SnapshotManager._adjust_timestamp_format( - json.loads(beam_mode_end)["validity"]["endTime"] - ), - ) - return t1, t2 - - @staticmethod - def _get_dynamic_range( - ref_time: datetime, - prior_time: Optional[str], - time_unit: Optional[str], - dynamic_duration: int, - ) -> Tuple[int, int]: - if prior_time is None: - raise ValueError("prior_time cannot be None") - - if time_unit is None: - raise ValueError("time_unit cannot be None") - - if prior_time == "Now": - t2 = ref_time - elif prior_time == "Start of hour": - t2 = ref_time.replace(microsecond=0, second=0, minute=0) - elif prior_time == "Start of day": - t2 = ref_time.replace(microsecond=0, second=0, minute=0, hour=0) - elif prior_time == "Start of month": - t2 = ref_time.replace(microsecond=0, second=0, minute=0, hour=0, day=1) - elif prior_time == "Start of year": - t2 = ref_time.replace( - microsecond=0, second=0, minute=0, hour=0, day=1, month=1 - ) - else: - raise ValueError(prior_time + " not supported as a prior time") - - # Dynamic timewindow - args: Any = {time_unit.lower(): dynamic_duration} - t1 = t2 - relativedelta(**args) - - return utils.get_nanos_from(t1), utils.get_nanos_from(t2) - @staticmethod def _to_datetime(time_zone: Optional[str], ts_str: Optional[str]) -> datetime: if time_zone is None: @@ -267,7 +231,3 @@ class SnapshotManager: return utc_time.replace(tzinfo=timezone.utc) return datetime.strptime(ts_str, TIME_FORMAT) - - @staticmethod - def _adjust_timestamp_format(ts: str) -> str: - return ts.replace("T", " ").replace("Z", "") diff --git a/python/pytimber/pytimber/utils.py b/python/pytimber/pytimber/utils.py index bc14187d200c3e27778907a27c38ed30b8019d7d..f85b01f5e17ddcca0ede8e79e58878f4f7c8211f 100644 --- a/python/pytimber/pytimber/utils.py +++ b/python/pytimber/pytimber/utils.py @@ -75,11 +75,11 @@ def verify_unix_time_is_set(unix_time: Optional[bool]) -> None: def to_time_window( spark: SparkSession, from_time: PyTimberTimeType, to_time: PyTimberTimeType ) -> Any: - t1, t2 = (_get_instant_from(spark, from_time), _get_instant_from(spark, to_time)) + t1, t2 = (get_instant_from(spark, from_time), get_instant_from(spark, to_time)) return spark._jvm.cern.nxcals.api.domain.TimeWindow.between(t1, t2) -def _get_instant_from(spark: SparkSession, t: PyTimberTimeType) -> Any: +def get_instant_from(spark: SparkSession, t: PyTimberTimeType) -> Any: ns = get_nanos_from(t) time_utils = spark._jvm.cern.nxcals.api.utils.TimeUtils return time_utils.getInstantFromNanos(ns)