NXCALS-5377 Avoid between expression for dataset range-join actions
Given the following align to dataset query (against TESTBED cluster)
Dataset<Row> drivingDataset = DataQuery.builder(sparkSession)
.byVariables()
.system("CMW")
.startTime("2021-07-08 09:52:16.3").endTime("2021-07-09 09:52:16.3")
.variable("TestTimberDevice01:ExponentialProperty:value")
.build();
DatasetAggregationProperties aggregationProperties = DatasetAggregationProperties.builder()
.drivingDataset(drivingDataset).build();
Variable myVariable = variableService.findOne(Variables.suchThat().variableName()
.eq("TestTimberDevice01:ScalarProperty:value"))
.orElseThrow(() -> new IllegalArgumentException("Could not obtain variable from service"));
Dataset<Row> aggregatedDataset = aggregationService.getData(myVariable, aggregationProperties);
Instant t1 = Instant.now();
System.out.println("Num of records: " + aggregatedDataset.count());
Instant t2 = Instant.now();
Duration duration = Duration.between(t1, t2);
System.out.println("Total execution time: " + duration.getSeconds());
Conditions
spark.executor.instances: 8
spark.executor.cores: 4
spark.executor.memory: 8g
spark.masterType: local[*]
Before
Over 10min(!!), had to stop the process. Functionality can be considered unusable for the local use-case!
After
Number of records: 86400 Total execution time (sec): 6 (!)
spark.masterType: yarn
Before
Number of records: 86400 Total execution time (sec): 261 (or 4min and 21sec)
After
Number of records: 86400 Total execution time (sec): 15
@acc-logging-team - please have a look for approval.
Please describe the merge request here.
Closes NXCALS-5377
Edited by Grigorios Avgitidis