diff --git a/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQuery.java b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQuery.java new file mode 100644 index 0000000000000000000000000000000000000000..9ecd8617aa54dfb2c7a1f8e98f1e993e6b64880a --- /dev/null +++ b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQuery.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2016 European Organisation for Nuclear Research (CERN), All Rights Reserved. + */ +package cern.nxcals.api.custom.extraction.data.builders.expanded; + +import cern.nxcals.api.domain.Entity; +import cern.nxcals.api.domain.EntityQuery; +import cern.nxcals.api.domain.TimeWindow; +import cern.nxcals.api.domain.Variable; +import cern.nxcals.api.extraction.data.builders.fluent.QueryData; +import cern.nxcals.api.extraction.data.builders.fluent.v2.AbstractDataQuery; +import cern.nxcals.api.extraction.data.builders.fluent.v2.KeyValueStage; +import cern.nxcals.api.extraction.data.builders.fluent.v2.KeyValueStageLoop; +import cern.nxcals.api.extraction.data.builders.fluent.v2.VariableStageLoop; +import cern.nxcals.common.annotation.Experimental; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; + +@Experimental +@RequiredArgsConstructor +public class DataQuery extends AbstractDataQuery<List<Dataset<Row>>> { + private final SparkSession session; + + public static DataQuery builder(@NonNull SparkSession session) { + return new DataQuery(session); + } + + protected QueryData<List<Dataset<Row>>> queryData() { + return new QueryData<>(new SparkDatasetProducer(session)); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession sparkSession, @NonNull TimeWindow timeWindow, + @NonNull Variable variable) { + return DataQuery + .getFor(sparkSession, timeWindow, variable.getSystemSpec().getName(), + variable.getVariableName()); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession sparkSession, @NonNull TimeWindow timeWindow, + @NonNull Entity entity) { + return DataQuery + .getFor(sparkSession, timeWindow, entity.getSystemSpec().getName(), + new EntityQuery(entity.getEntityKeyValues())); + } + + @SafeVarargs + public static List<Dataset<Row>> getFor(@NonNull SparkSession spark, @NonNull TimeWindow timeWindow, + @NonNull String system, @NonNull Map<String, Object>... keyValuesArr) { + return DataQuery.getFor(spark, timeWindow, system, + Arrays.stream(keyValuesArr).map(EntityQuery::new).toArray(EntityQuery[]::new)); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession spark, @NonNull TimeWindow timeWindow, + @NonNull String system, @NonNull EntityQuery... entitiesQueries) { + KeyValueStage<List<Dataset<Row>>> dataQuery = DataQuery.builder(spark) + .entities().system(system); + KeyValueStageLoop<List<Dataset<Row>>> entities = null; + for (EntityQuery query : entitiesQueries) { + entities = query.hasPatterns() ? + dataQuery.keyValuesLike(query.toMap()) : + dataQuery.keyValuesEq(query.getKeyValues()); + } + if (entities == null) { + throw new IllegalArgumentException("No entity query passed"); + } + return entities.timeWindow(timeWindow).build(); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession spark, @NonNull TimeWindow timeWindow, + @NonNull String system, @NonNull String... variables) { + return DataQuery.getFor(spark, timeWindow, system, asList(variables)); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession spark, @NonNull TimeWindow timeWindow, + @NonNull String system, @NonNull List<String> variables) { + return DataQuery + .getFor(spark, timeWindow, system, variables, Collections.emptyList()); + } + + public static List<Dataset<Row>> getFor(@NonNull SparkSession spark, @NonNull TimeWindow timeWindow, + @NonNull String system, @NonNull List<String> variables, + @NonNull List<String> variablesLike) { + if (variables.isEmpty() && variablesLike.isEmpty()) { + throw new IllegalArgumentException("No variable names nor variable name patterns given"); + } + VariableStageLoop<List<Dataset<Row>>> builder = DataQuery.builder(spark).variables() + .system(system) + .nameIn(variables); + + variablesLike.forEach(builder::nameLike); + + return builder.timeWindow(timeWindow).build(); + } + +} diff --git a/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQuery.java b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQuery.java new file mode 100644 index 0000000000000000000000000000000000000000..2dd2700a804568ffd1190f5af7165bf73979d0d3 --- /dev/null +++ b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQuery.java @@ -0,0 +1,62 @@ +package cern.nxcals.api.custom.extraction.data.builders.expanded; + +import cern.nxcals.api.domain.TimeWindow; +import cern.nxcals.api.extraction.data.builders.fluent.QueryData; +import cern.nxcals.api.extraction.data.builders.fluent.v2.DeviceStage; +import cern.nxcals.api.extraction.data.builders.fluent.v2.StageSequenceDeviceProperty; +import cern.nxcals.api.extraction.data.builders.fluent.v2.SystemStage; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.List; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParameterDataQuery { + public static SystemStage<DeviceStage<List<Dataset<Row>>>, List<Dataset<Row>>> builder( + SparkSession session) { + return StageSequenceDeviceProperty.<List<Dataset<Row>>>sequence() + .apply(new QueryData<>(new SparkDatasetProducer(session))); + } + + /** + * Search for given device/property in given system within time window. + * + * @param sparkSession spark session + * @param system system name (CMW, PM ...) + * @param device device name + * @param property property name + * @param timeWindow time window to query for + * @return Dataset with result in {@link Row} + */ + public static List<Dataset<Row>> getFor(@NonNull SparkSession sparkSession, @NonNull String system, + @NonNull String device, @NonNull String property, @NonNull TimeWindow timeWindow) { + return ParameterDataQuery.builder(sparkSession) + .system(system) + .deviceEq(device) + .propertyEq(property) + .timeWindow(timeWindow) + .build(); + } + + /** + * Query for given parameter and return result dataset + * + * @param sparkSession spark session + * @param system system name + * @param parameter in form device/property + * @param timeWindow time window for query + * @return result dataset + */ + public static List<Dataset<Row>> getFor(@NonNull SparkSession sparkSession, @NonNull String system, + @NonNull String parameter, @NonNull TimeWindow timeWindow) { + return ParameterDataQuery.builder(sparkSession) + .system(system) + .parameterEq(parameter) + .timeWindow(timeWindow) + .build(); + } +} diff --git a/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/SparkDatasetProducer.java b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/SparkDatasetProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..ce8d0fd4948cd67e9f1b6264be41238c0132c84f --- /dev/null +++ b/cern-extraction-api/src/main/java/cern/nxcals/api/custom/extraction/data/builders/expanded/SparkDatasetProducer.java @@ -0,0 +1,37 @@ +package cern.nxcals.api.custom.extraction.data.builders.expanded; + +import cern.nxcals.api.authorization.RbacHadoopDelegationTokenProvider; +import cern.nxcals.api.extraction.data.builders.CallDetailsProvider; +import cern.nxcals.api.extraction.data.builders.fluent.QueryData; +import cern.nxcals.api.extraction.data.spark.SparkExtractionTaskExecutor; +import cern.nxcals.api.extraction.metadata.InternalServiceClientFactory; +import cern.nxcals.common.domain.ExtractionCriteria; +import cern.nxcals.common.domain.ExtractionUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.List; +import java.util.function.Function; + +@Slf4j +@RequiredArgsConstructor +public class SparkDatasetProducer implements Function<QueryData<List<Dataset<Row>>>, List<Dataset<Row>>> { + private final CallDetailsProvider detailsProvider = new CallDetailsProvider(); + private final SparkSession session; + + @Override + public List<Dataset<Row>> apply(QueryData<List<Dataset<Row>>> queryData) { + // We need to obtain initial delegation tokens before Spark tries to communicate with Hadoop + RbacHadoopDelegationTokenProvider.obtainAndSetDelegationTokensViaRbacIfRequired(); + + detailsProvider.setCallDetails(session); + + SparkExtractionTaskExecutor executor = new SparkExtractionTaskExecutor(session); + ExtractionCriteria criteria = queryData.toExtractionCriteria(); + ExtractionUnit unit = InternalServiceClientFactory.createEntityResourceService().findBy(criteria); + return executor.executeExpanded(unit); + } +} diff --git a/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQueryTest.java b/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQueryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c7cbc5de30d7ad68ce9d9fc41993814504e1e1ed --- /dev/null +++ b/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/DataQueryTest.java @@ -0,0 +1,170 @@ +package cern.nxcals.api.custom.extraction.data.builders.expanded; + +import cern.nxcals.api.domain.Entity; +import cern.nxcals.api.domain.EntityQuery; +import cern.nxcals.api.domain.TimeWindow; +import cern.nxcals.api.domain.Variable; +import cern.nxcals.common.testutils.CartesianProduct; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DataQueryTest { + + private static final SparkSession[] sparkSessions = { Mockito.mock(SparkSession.class), null }; + private static final TimeWindow[] timeWindows = { + TimeWindow.between(Instant.ofEpochSecond(10), Instant.ofEpochSecond(50)), null }; + private static final String[] systems = { "SYSTEM", null }; + private static final Variable[] variables = { Mockito.mock(Variable.class), null }; + private static final Entity[] entities = { Mockito.mock(Entity.class), null }; + private static final String[] variablesNames = { "var", null }; + private static final ArrayList<Map<String, Object>> keyValues = new ArrayList<>(0); + + static { + keyValues.add(Map.of("var", "var")); + keyValues.add(null); + } + + private static Stream<Arguments> getForVariableNPETestArgs() { + return CartesianProduct.compose(sparkSessions, timeWindows, variables); + } + + private static Stream<Arguments> getForEntityNPETestArgs() { + return CartesianProduct.compose(sparkSessions, timeWindows, entities); + } + + private static Stream<Arguments> getForVariableNameNPETestArgs() { + return CartesianProduct.compose(sparkSessions, timeWindows, systems, variablesNames); + } + + private static Stream<Arguments> getEntityWithKeyValueNPETestArgs() { + return CartesianProduct.compose(sparkSessions, timeWindows, systems, keyValues.toArray()); + } + + private static Stream<Arguments> getEntityWithKeysInMapNPETestArgs() { + return CartesianProduct.compose(sparkSessions, timeWindows, systems, keyValues.toArray(), + keyValues.toArray()); + } + + @ParameterizedTest + @MethodSource("getForVariableNPETestArgs") + void shouldThrowNPEIfOneOfParamsIsNullOnGetForVariable(SparkSession session, TimeWindow timeWindow, + Variable variable) { + if (Stream.of(session, timeWindow, variable).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery.getFor(session, timeWindow, variable)); + } + } + + @ParameterizedTest + @MethodSource("getForEntityNPETestArgs") + void shouldThrowNPEIfOneOfParamsIsNullOnGetForEntity(SparkSession session, TimeWindow timeWindow, + Entity entity) { + if (Stream.of(session, timeWindow, entity).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery.getFor(session, timeWindow, entity)); + } + } + + @ParameterizedTest + @MethodSource("getForVariableNameNPETestArgs") + void shouldThrowNPEIfOneOfParametersIsNullOnGetForSingleVariableName(SparkSession sparkSession, + TimeWindow timeWindow, String system, String variable) { + if (Stream.of(sparkSession, timeWindow, system, variable).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, variable)); + } + } + + @ParameterizedTest + @MethodSource("getForVariableNameNPETestArgs") + void shouldThrowNPEIfOneOfParametersIsNullOnGetForMultipleVariableNames(SparkSession sparkSession, + TimeWindow timeWindow, String system, String variable) { + if (Stream.of(sparkSession, timeWindow, system, variable).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, "name", variable)); + } + } + + @ParameterizedTest + @MethodSource("getForVariableNameNPETestArgs") + void shouldThrowNPEIfOneOfParametersIsNullOnGetForListNames(SparkSession sparkSession, TimeWindow timeWindow, + String system, String variable) { + if (Stream.of(sparkSession, timeWindow, system, variable).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, toList(variable))); + } + } + + @ParameterizedTest + @MethodSource("getForVariableNameNPETestArgs") + void shouldThrowNPEIfOneOfParametersIsNull(SparkSession sparkSession, + TimeWindow timeWindow, String system, String variable) { + if (Stream.of(sparkSession, timeWindow, system, variable).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, List.of("VAR_NAME"), toList(variable))); + } + } + + @Test + void shouldIllegalArgumentIfVariablesListIsEmpty() { + TimeWindow timeWindow = TimeWindow.between(0, 1); + SparkSession sessionMock = Mockito.mock(SparkSession.class); + assertThrows(IllegalArgumentException.class, + () -> DataQuery.getFor(sessionMock, timeWindow, "SYSTEM", List.of())); + } + + @ParameterizedTest + @MethodSource("getEntityWithKeysInMapNPETestArgs") + void getForWithKeyValuesAsMapShouldAndRestOfParamsThrowNPEIfOneOfParametersIsNull(SparkSession sparkSession, + TimeWindow timeWindow, String system, Map<String, Object> keyValue, Map<String, Object> keyValueLike) { + if (Stream.of(sparkSession, timeWindow, system, keyValue, keyValueLike).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, new EntityQuery(keyValue, keyValueLike))); + } + } + + @ParameterizedTest + @MethodSource("getEntityWithKeyValueNPETestArgs") + void shouldThrowNPEIfOneOfParametersIsNullWithKeyValuesAsMap(SparkSession sparkSession, + TimeWindow timeWindow, String system, Map<String, Object> keyValue) { + if (Stream.of(sparkSession, timeWindow, system, keyValue).anyMatch(Objects::isNull)) { + assertThrows(NullPointerException.class, + () -> DataQuery + .getFor(sparkSession, timeWindow, system, keyValue)); + } + } + + @Test + void shouldThrowIllegalArgumentIfKeyValueMapIsEmpty() { + TimeWindow timeWindow = TimeWindow.between(0, 1); + SparkSession sessionMock = Mockito.mock(SparkSession.class); + assertThrows(IllegalArgumentException.class, + () -> DataQuery.getFor(sessionMock, timeWindow, "SYSTEM", new HashMap<>())); + } + + private static <E> List<E> toList(E element) { + if (element == null) { + return null; + } + return List.of(element); + } +} diff --git a/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQueryTest.java b/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQueryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dc159b48402182b06fb9f10841b5f588ce06c0ec --- /dev/null +++ b/cern-extraction-api/src/test/java/cern/nxcals/api/custom/extraction/data/builders/expanded/ParameterDataQueryTest.java @@ -0,0 +1,111 @@ +package cern.nxcals.api.custom.extraction.data.builders.expanded; + +import cern.nxcals.api.domain.TimeWindow; +import cern.nxcals.api.extraction.data.builders.fluent.v2.DeviceStage; +import cern.nxcals.api.extraction.data.builders.fluent.v2.SystemStage; +import cern.nxcals.api.extraction.metadata.InternalServiceClientFactory; +import cern.nxcals.common.domain.ExtractionUnit; +import cern.nxcals.internal.extraction.metadata.InternalEntityResourceService; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Instant; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * This class is just a set of not so useful tests, to make Sonar happy. + * The real tests are in the integration module. + */ +@ExtendWith(MockitoExtension.class) +class ParameterDataQueryTest { + + @Mock + private SparkSession spark; + + @Mock + private SparkContext sparkContext; + + @Mock + private InternalEntityResourceService resourceService; + + @Test + void builder() { + SystemStage<DeviceStage<List<Dataset<Row>>>, List<Dataset<Row>>> builder = ParameterDataQuery.builder(spark); + assertNotNull(builder); + } + + @Test + void shouldThrowIfSparkSessionIsNullOnGetFor() { + TimeWindow timeWindow = TimeWindow.between(Instant.ofEpochSecond(10), Instant.ofEpochSecond(50)); + assertThrows(NullPointerException.class, () -> ParameterDataQuery.getFor(null, "cmw", + "testDevice", "testProperty", TimeWindow.empty())); + } + + @Test + void shouldThrowIfSystemIsNullOnGetFor() { + + TimeWindow timeWindow = TimeWindow.between(Instant.ofEpochSecond(10), Instant.ofEpochSecond(50)); + assertThrows(NullPointerException.class, () -> ParameterDataQuery.getFor(spark, + null, "testDevice", "testProperty", TimeWindow.empty())); + } + + @Test + void shouldThrowIfSystemIsNullOnGetForParameter() { + TimeWindow timeWindow = TimeWindow.between(Instant.ofEpochSecond(10), Instant.ofEpochSecond(50)); + assertThrows(NullPointerException.class, () -> ParameterDataQuery.getFor(null, + "cmw", "testDevice/testProperty", TimeWindow.empty())); + } + + @Test + void shouldNotConnect() { + assertThrows(RuntimeException.class, + () -> ParameterDataQuery.getFor(spark, "CMW", "DEV/PROP", TimeWindow.empty())); + } + + @Test + void shouldGetAListOfDatasetsForParameter() { + try (MockedStatic<InternalServiceClientFactory> factory = Mockito + .mockStatic(InternalServiceClientFactory.class)) { + factory.when(InternalServiceClientFactory::createEntityResourceService).thenReturn(resourceService); + + when(resourceService.findBy(argThat(x -> true))).thenReturn(mock(ExtractionUnit.class)); + when(spark.sparkContext()).thenReturn(sparkContext); + when(sparkContext.appName()).thenReturn("app"); + + assertNotNull(ParameterDataQuery.getFor(spark, "CMW", "DEV/PROP", TimeWindow.empty())); + + } + + } + + @Test + void shouldGetAListOfDatasetsForDevProp() { + try (MockedStatic<InternalServiceClientFactory> factory = Mockito + .mockStatic(InternalServiceClientFactory.class)) { + factory.when(InternalServiceClientFactory::createEntityResourceService).thenReturn(resourceService); + + when(resourceService.findBy(argThat(x -> true))).thenReturn(mock(ExtractionUnit.class)); + when(spark.sparkContext()).thenReturn(sparkContext); + when(sparkContext.appName()).thenReturn("app"); + + assertNotNull(ParameterDataQuery.getFor(spark, "CMW", "DEV", "PROP", TimeWindow.empty())); + + } + + } + +} \ No newline at end of file diff --git a/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/AbstractDataQuery.java b/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/AbstractDataQuery.java new file mode 100644 index 0000000000000000000000000000000000000000..f677a2b54471534cd41cdbd67bc0e27a372a3f65 --- /dev/null +++ b/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/AbstractDataQuery.java @@ -0,0 +1,16 @@ +package cern.nxcals.api.extraction.data.builders.fluent.v2; + +import cern.nxcals.api.extraction.data.builders.fluent.QueryData; + +public abstract class AbstractDataQuery<Z> { + + public SystemOrIdStage<VariableStage<Z>, VariableStageLoop<Z>, Z> variables() { + return StageSequenceVariables.<Z>sequence().apply(queryData()); + } + + public SystemOrIdStage<KeyValueStage<Z>, KeyValueStageLoop<Z>, Z> entities() { + return StageSequenceEntities.<Z>sequence().apply(queryData()); + } + + protected abstract QueryData<Z> queryData(); +} diff --git a/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/BuildStage.java b/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/BuildStage.java index d7f1bfdfb0a525ccb2ab3ee963ca3de5e9c11d64..91b77a9df0a768e1f1814c62c9e3cb630bd0e825 100644 --- a/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/BuildStage.java +++ b/common/src/main/java/cern/nxcals/api/extraction/data/builders/fluent/v2/BuildStage.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Set; public class BuildStage<T> extends Stage<BuildStage<T>, T> implements AliasStage<BuildStage<T>> { + public BuildStage<T> fieldAliases(@NotNull Map<String, Set<String>> aliasFieldsMap) { aliasFieldsMap.forEach((alias, fields) -> data().addAlias(alias, fields)); return this; @@ -20,4 +21,5 @@ public class BuildStage<T> extends Stage<BuildStage<T>, T> implements AliasStage public T build() { return data().build(); } + } diff --git a/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/DataQuery.java b/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/DataQuery.java index c9dd1e02558943e62fe9c5a42551230e0b0a8668..671de57fd79f059b7f3443d3732105b292e13b0f 100644 --- a/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/DataQuery.java +++ b/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/DataQuery.java @@ -4,11 +4,6 @@ package cern.nxcals.api.extraction.thin.data.builders; import cern.nxcals.api.extraction.data.builders.fluent.QueryData; -import cern.nxcals.api.extraction.data.builders.fluent.v2.KeyValueStage; -import cern.nxcals.api.extraction.data.builders.fluent.v2.StageSequenceEntities; -import cern.nxcals.api.extraction.data.builders.fluent.v2.StageSequenceVariables; -import cern.nxcals.api.extraction.data.builders.fluent.v2.SystemStage; -import cern.nxcals.api.extraction.data.builders.fluent.v2.VariableStage; import lombok.AccessLevel; import lombok.AllArgsConstructor; @@ -18,17 +13,10 @@ public class DataQuery extends LegacyBuildersDataQuery { return new DataQuery(); } - private QueryData<String> queryData() { + protected QueryData<String> queryData() { return new QueryData<>(new ScriptProducer()); } - public SystemStage<VariableStage<String>, String> variables() { - return StageSequenceVariables.<String>sequence().apply(queryData()); - } - - public SystemStage<KeyValueStage<String>, String> entities() { - return StageSequenceEntities.<String>sequence().apply(queryData()); - } } diff --git a/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/LegacyBuildersDataQuery.java b/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/LegacyBuildersDataQuery.java index 06ef729dbf4168d6bc838f0637bf44bc668f610d..d48a03f336f2a789ff8508befbdec004f8d992e2 100644 --- a/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/LegacyBuildersDataQuery.java +++ b/extraction-api-thin/src/main/java/cern/nxcals/api/extraction/thin/data/builders/LegacyBuildersDataQuery.java @@ -8,9 +8,10 @@ import cern.nxcals.api.extraction.data.builders.fluent.StageSequenceVariables; import cern.nxcals.api.extraction.data.builders.fluent.SystemStage; import cern.nxcals.api.extraction.data.builders.fluent.TimeStartStage; import cern.nxcals.api.extraction.data.builders.fluent.VariableAliasStage; +import cern.nxcals.api.extraction.data.builders.fluent.v2.AbstractDataQuery; @SuppressWarnings("java:S1610") -abstract class LegacyBuildersDataQuery { +abstract class LegacyBuildersDataQuery extends AbstractDataQuery<String> { /** * @return query builder */ diff --git a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/CallDetailsProvider.java b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/CallDetailsProvider.java index 084f614cb445ded09384a95235fbf96792ed4e24..f79093bf4a5a360dc62c34c8b5058da6b6fac98d 100644 --- a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/CallDetailsProvider.java +++ b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/CallDetailsProvider.java @@ -8,7 +8,7 @@ import org.apache.spark.sql.SparkSession; import java.util.function.Supplier; @RequiredArgsConstructor -class CallDetailsProvider { +public class CallDetailsProvider { private final Supplier<StackTraceElement[]> stackTraceSupplier; @@ -16,7 +16,7 @@ class CallDetailsProvider { this(() -> Thread.currentThread().getStackTrace()); } - void setCallDetails(SparkSession session) { + public void setCallDetails(SparkSession session) { StackTraceElement[] stackTrace = stackTraceSupplier.get(); StackTraceElement element; @@ -24,7 +24,7 @@ class CallDetailsProvider { int stackLength = stackTrace.length; if (stackLength > 0) { - element = stackTrace[nthElementFromEnd > stackLength ? 0 : stackTrace.length - nthElementFromEnd]; + element = stackTrace[nthElementFromEnd > stackLength ? 0 : stackTrace.length - nthElementFromEnd]; } else { element = new StackTraceElement("", "", null, 0); } diff --git a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/DataQuery.java b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/DataQuery.java index 6d8cbdc2a17052bfa20f1f03878e8e6d7add8dc3..9b708631d1f66879e2ba05318d6a078aa76a6294 100644 --- a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/DataQuery.java +++ b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/DataQuery.java @@ -10,10 +10,6 @@ import cern.nxcals.api.domain.Variable; import cern.nxcals.api.extraction.data.builders.fluent.QueryData; import cern.nxcals.api.extraction.data.builders.fluent.v2.KeyValueStage; import cern.nxcals.api.extraction.data.builders.fluent.v2.KeyValueStageLoop; -import cern.nxcals.api.extraction.data.builders.fluent.v2.StageSequenceEntities; -import cern.nxcals.api.extraction.data.builders.fluent.v2.StageSequenceVariables; -import cern.nxcals.api.extraction.data.builders.fluent.v2.SystemOrIdStage; -import cern.nxcals.api.extraction.data.builders.fluent.v2.VariableStage; import cern.nxcals.api.extraction.data.builders.fluent.v2.VariableStageLoop; import lombok.NonNull; import org.apache.spark.sql.Dataset; @@ -96,15 +92,8 @@ public class DataQuery extends LegacyBuildersDataQuery { return builder.timeWindow(timeWindow).build(); } - private QueryData<Dataset<Row>> queryData() { + protected QueryData<Dataset<Row>> queryData() { return new QueryData<>(new SparkDatasetProducer(session)); } - public SystemOrIdStage<VariableStage<Dataset<Row>>, VariableStageLoop<Dataset<Row>>, Dataset<Row>> variables() { - return StageSequenceVariables.<Dataset<Row>>sequence().apply(queryData()); - } - - public SystemOrIdStage<KeyValueStage<Dataset<Row>>, KeyValueStageLoop<Dataset<Row>>, Dataset<Row>> entities() { - return StageSequenceEntities.<Dataset<Row>>sequence().apply(queryData()); - } } diff --git a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/LegacyBuildersDataQuery.java b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/LegacyBuildersDataQuery.java index e4a11c47ba8b39f7c3812189c692e4eccb097c17..64aa6944228d91bf94b6169b3435e211c46c2991 100644 --- a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/LegacyBuildersDataQuery.java +++ b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/builders/LegacyBuildersDataQuery.java @@ -8,13 +8,18 @@ import cern.nxcals.api.extraction.data.builders.fluent.StageSequenceVariables; import cern.nxcals.api.extraction.data.builders.fluent.SystemStage; import cern.nxcals.api.extraction.data.builders.fluent.TimeStartStage; import cern.nxcals.api.extraction.data.builders.fluent.VariableAliasStage; +import cern.nxcals.api.extraction.data.builders.fluent.v2.AbstractDataQuery; import lombok.RequiredArgsConstructor; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @RequiredArgsConstructor -abstract class LegacyBuildersDataQuery { + +/** + * This class has to be public for python stub generation. + */ +public abstract class LegacyBuildersDataQuery extends AbstractDataQuery<Dataset<Row>> { protected final SparkSession session; /** diff --git a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutor.java b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutor.java index 6b1cbb76f2d784c1790b42f6c037bfbd1739af29..12efad004d9dd67c9cb69170ee87c68a3c2e6830 100644 --- a/extraction-api/src/main/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutor.java +++ b/extraction-api/src/main/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutor.java @@ -1,5 +1,6 @@ package cern.nxcals.api.extraction.data.spark; +import cern.nxcals.common.annotation.Experimental; import cern.nxcals.common.domain.ExtractionTask; import cern.nxcals.common.domain.ExtractionTaskProcessor; import cern.nxcals.common.domain.ExtractionUnit; @@ -16,13 +17,13 @@ import java.util.List; import java.util.Optional; import java.util.function.Function; import java.util.function.UnaryOperator; +import java.util.stream.Stream; import static cern.nxcals.api.extraction.data.ExtractionUtils.createEmptyDataFrame; import static cern.nxcals.common.Constants.DATASET_BUILD_PARALLELIZE_ABOVE_SIZE; import static cern.nxcals.common.Constants.DEFAULT_PARALLELIZE_ABOVE_SIZE; import static java.util.stream.Collectors.toList; import static lombok.AccessLevel.PACKAGE; -import static org.apache.commons.collections.CollectionUtils.isEmpty; @Slf4j @RequiredArgsConstructor(access = PACKAGE) @@ -31,50 +32,52 @@ public class SparkExtractionTaskExecutor { private final SparkSession session; @NonNull private final SparkExtractionTaskProcessor processor; - @NonNull - private final Function<ExtractionUnit, UnaryOperator<Dataset<Row>>> postProcessorMaker; + @NonNull private final Function<ExtractionUnit, ExtractionTaskProcessor<ExtractionTask>> preProcessorMaker; + @NonNull + private final Function<ExtractionUnit, UnaryOperator<Dataset<Row>>> postProcessorMaker; + public SparkExtractionTaskExecutor(SparkSession session) { //@formatter:off this(session, new SparkExtractionTaskProcessor( new HdfsDatasetCreator(session), new HBaseDatasetCreator(session)), - DatasetPostProcessor::new, - ExtractionTaskPreProcessor::new + ExtractionTaskPreProcessor::new, + DatasetPostProcessor::new ); //@formatter:on } public Dataset<Row> execute(ExtractionUnit unit) { - List<ExtractionTask> preProcessedTasks = preProcessUnit(unit); - if (isEmpty(preProcessedTasks)) { - log.info("Received null or empty list of extraction tasks, will create dataset with default columns!"); - return emptyFrameFrom(unit); - } - - Optional<Dataset<Row>> dataset = execute(preProcessedTasks); //@formatter:off - return dataset + return createDatasetStream(preProcessUnit(unit)) + .reduce(Dataset::union) .map(d -> postProcessorMaker.apply(unit).apply(d)) .orElseGet(() -> emptyFrameFrom(unit)); //@formatter:on } + @Experimental + public List<Dataset<Row>> executeExpanded(ExtractionUnit unit) { + return createDatasetStream(preProcessUnit(unit)) + .map(d -> postProcessorMaker.apply(unit).apply(d)).collect(toList()); + } + private List<ExtractionTask> preProcessUnit(ExtractionUnit unit) { ExtractionTaskProcessor<ExtractionTask> preProcessor = preProcessorMaker.apply(unit); return unit.getTasks().stream().map(t -> t.processWith(preProcessor)).collect(toList()); } - private Optional<Dataset<Row>> execute(List<ExtractionTask> tasks) { + private Stream<Dataset<Row>> createDatasetStream(List<ExtractionTask> tasks) { log.debug("Getting data set for {} extraction tasks", tasks.size()); //@formatter:off - return StreamUtils.of(tasks, ConfigHolder.getInt(DATASET_BUILD_PARALLELIZE_ABOVE_SIZE, DEFAULT_PARALLELIZE_ABOVE_SIZE)) + return StreamUtils + .of(tasks, ConfigHolder.getInt(DATASET_BUILD_PARALLELIZE_ABOVE_SIZE, DEFAULT_PARALLELIZE_ABOVE_SIZE)) .map(this::toOptionalDataset) .filter(Optional::isPresent) - .map(Optional::get) - .reduce(Dataset::union); + .map(Optional::get); //@formatter:on } @@ -83,6 +86,7 @@ public class SparkExtractionTaskExecutor { } private Dataset<Row> emptyFrameFrom(ExtractionUnit unit) { + log.info("Received null or empty list of extraction tasks, will create dataset with default columns!"); return createEmptyDataFrame(session, unit.getEmptyDatasetMapping()); } diff --git a/extraction-api/src/test/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutorTest.java b/extraction-api/src/test/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutorTest.java index 855974567037f6ee459acee7895805997a625242..a58f9a7edcef585470069e67806902fb4e2e74bd 100644 --- a/extraction-api/src/test/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutorTest.java +++ b/extraction-api/src/test/java/cern/nxcals/api/extraction/data/spark/SparkExtractionTaskExecutorTest.java @@ -38,7 +38,7 @@ class SparkExtractionTaskExecutorTest { this.processor = mock(SparkExtractionTaskProcessor.class); this.postMaker = mock(PostMaker.class); this.preMaker = mock(PreMaker.class); - this.instance = new SparkExtractionTaskExecutor(TestUtils.SESSION, processor, postMaker, preMaker); + this.instance = new SparkExtractionTaskExecutor(TestUtils.SESSION, processor, preMaker, postMaker); } @Test diff --git a/integration-tests/src/integration-test/java/cern/nxcals/integrationtests/extraction/custom/data/builders/expanded/DataQueryExpandedTest.java b/integration-tests/src/integration-test/java/cern/nxcals/integrationtests/extraction/custom/data/builders/expanded/DataQueryExpandedTest.java new file mode 100644 index 0000000000000000000000000000000000000000..69bf494321d23f169b01ef6450a96b3ad4fbc059 --- /dev/null +++ b/integration-tests/src/integration-test/java/cern/nxcals/integrationtests/extraction/custom/data/builders/expanded/DataQueryExpandedTest.java @@ -0,0 +1,49 @@ +package cern.nxcals.integrationtests.extraction.custom.data.builders.expanded; + +import cern.nxcals.api.config.SparkContext; +import cern.nxcals.api.custom.extraction.data.builders.expanded.DataQuery; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.DependsOn; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = SparkContext.class) +@SpringBootApplication +@Slf4j +@DependsOn("kerberos") +public class DataQueryExpandedTest { + + @Autowired + private SparkSession sparkSession; + + @Test + public void shouldGetListOfDatasets() { + + Map<String, Object> keyValuesLike = new HashMap<>(); + keyValuesLike.put("device", "NXCALS_MONITORING_DEV%"); + + List<Dataset<Row>> datasets = DataQuery.builder(sparkSession).entities().system("MOCK-SYSTEM") + .keyValuesLike(keyValuesLike).timeWindow( + Instant.now().minus(1, ChronoUnit.DAYS), Instant.now()).build(); + + assertTrue(datasets.size() > 1); + + } + +}