Skip to content

NXCALS-5377 Avoid between expression for dataset range-join actions

Given the following align to dataset query (against TESTBED cluster)

Dataset<Row> drivingDataset = DataQuery.builder(sparkSession)
        .byVariables()
        .system("CMW")
        .startTime("2021-07-08 09:52:16.3").endTime("2021-07-09 09:52:16.3")
        .variable("TestTimberDevice01:ExponentialProperty:value")
        .build();
DatasetAggregationProperties aggregationProperties = DatasetAggregationProperties.builder()
        .drivingDataset(drivingDataset).build();

Variable myVariable = variableService.findOne(Variables.suchThat().variableName()
                .eq("TestTimberDevice01:ScalarProperty:value"))
        .orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Dataset<Row> aggregatedDataset = aggregationService.getData(myVariable, aggregationProperties);

Instant t1 = Instant.now();
System.out.println("Num of records: " + aggregatedDataset.count());
Instant t2 = Instant.now();

Duration duration = Duration.between(t1, t2);
System.out.println("Total execution time: " + duration.getSeconds());

Conditions


spark.executor.instances: 8
spark.executor.cores: 4
spark.executor.memory: 8g

spark.masterType: local[*]

Before

Over 10min(!!), had to stop the process. Functionality can be considered unusable for the local use-case!

After

Number of records: 86400 Total execution time (sec): 6 (!)


spark.masterType: yarn

Before

Number of records: 86400 Total execution time (sec): 261 (or 4min and 21sec)

After

Number of records: 86400 Total execution time (sec): 15


@acc-logging-team - please have a look for approval.

Please describe the merge request here.

Closes NXCALS-5377

Edited by Grigorios Avgitidis

Merge request reports