Commit 26bb1a99 authored by Nikolay Tsvetkov's avatar Nikolay Tsvetkov
Browse files

create builder

parent 642664b6
......@@ -2,45 +2,59 @@ apply plugin: 'java'
apply plugin: 'eclipse'
//repositories {
// this should go to the top of the build.gradle otherwise release might fail
//if (project.ext.TRIGGERING_REMOTE_EXECUTION) {
// return
//}
//
//project.tasks.fatJar.zip64 = true
repositories {
// maven { url "http://repo.hortonworks.com/content/groups/public" }
// maven { url "https://repository.cloudera.com/content/repositories/releases/" }
// mavenCentral()
//}
mavenCentral()
}
//configurations.all { resolutionStrategy { force 'com.google.guava:guava:16.0'
//} }
configurations.all { resolutionStrategy { force 'com.google.guava:guava:16.0'
} }
//dependencies {
//
// compile('org.apache.spark:spark-core_2.11:2.0.1'){
// exclude group: 'org.slf4j', module: 'slf4j-log4j12'
// exclude group: 'com.google.guava'
// }
// compile('org.apache.spark:spark-sql_2.11:2.0.1'){
// exclude group: 'com.google.guava'
// }
//
// compile('com.hortonworks:shc:1.0.0-2.0-s_2.11'){
// exclude group: 'com.google.guava'
// exclude group: 'org.apache.hbase'
// exclude group: 'javax.validation'
//
// }
//
// compile('cern.accsoft.nxcals:accsoft-nxcals-common:+'){
// exclude group: 'com.google.guava'
// }
//
// compile('cern.accsoft.nxcals:accsoft-nxcals-service-client:+'){
// exclude group: 'com.google.guava'
// }
//
// compile('org.hibernate:hibernate-validator:5.3.2.Final')
//
// compile('org.apache.hbase:hbase-client:1.2.0'){
// exclude group: 'com.google.guava'
// }
//
// testCompile('org.mockito:mockito-all:1.10.19')
//}
dependencies {
compile('org.apache.spark:spark-core_2.11:2.0.1'){
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'com.google.guava'
}
compile('org.apache.spark:spark-sql_2.11:2.0.1'){
exclude group: 'com.google.guava'
}
compile('com.hortonworks:shc:1.0.0-2.0-s_2.11'){
exclude group: 'com.google.guava'
exclude group: 'org.apache.hbase'
exclude group: 'javax.validation'
}
compile('cern.accsoft.nxcals:accsoft-nxcals-common:+'){
exclude group: 'com.google.guava'
}
compile('cern.accsoft.nxcals:accsoft-nxcals-service-client:+'){
exclude group: 'com.google.guava'
}
compile('org.hibernate:hibernate-validator:5.3.2.Final')
compile('org.apache.hbase:hbase-client:1.2.0'){
exclude group: 'com.google.guava'
}
testCompile('org.mockito:mockito-all:1.10.19')
}
task uberJar(type: Jar) {
zip64 true
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
exclude 'META-INF/*.SF', 'META-INF/*.DSA', 'META-INF/*.RSA', 'META-INF/*.MF'
}
\ No newline at end of file
......@@ -2,3 +2,4 @@ maxParallelForks=1
cacheMaxAgeInHours=0
eclipseLocalOverride=true
svntag=false
alljars.enabled=true
\ No newline at end of file
package cern.accsoft.nxcals.data.access.builders;
/**
* Created by ntsvetko on 1/23/17.
*/
public enum DataAccessCatalog {
SYSTEM("systemName"),
DEVICE("device"),
PROPERTY("property"),
ELEMENT("element"),
PARAMETER("parameter"),
KEY("key"),
START_TIME("startTime"),
END_TIME("endTime"),
FIELDS("fields");
private final String value;
DataAccessCatalog(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
}
package cern.accsoft.nxcals.data.access.builders;
import cern.accsoft.nxcals.common.utils.TimeUtils;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static cern.accsoft.nxcals.data.access.builders.DataAccessCatalog.*;
/**
* Created by ntsvetko on 1/24/17.
*/
public class DataAccessCatalogBuilder {
private String system;
private String device;
private String property;
private String parameter;
private String element;
private Map<String,String> keyValue;
private String startTime;
private String endTime;
private String fields;
private DataAccessCatalog keyValueMethod;
private DataAccessCatalogBuilder() {
}
public static IKey system(String system) {
return DataAccessCatalogBuilder.Builder(Objects.requireNonNull(system));
}
public interface IKey {
IProperty device(String device);
IStartTime parameter(String parameter);
IKeyValue keyValue(String key, String value);
}
public interface IKeyValue {
IKeyValue keyValue(String key, String value);
IEndTime startTime(Instant startTime);
}
public interface IProperty {
IStartTime property(String property);
}
public interface IStartTime {
IEndTime startTime(Instant startTime);
}
public interface IEndTime {
IBuild endTime(Instant endTime);
}
public interface IBuild {
IBuild fields(String... fields);
Map<String, String> buildCatalog();
}
private static class Builder implements IKey, IProperty, IStartTime, IEndTime, IBuild, IKeyValue {
private DataAccessCatalogBuilder builder = new DataAccessCatalogBuilder();
Builder(String system) {
builder.system(system);
}
@Override
public IProperty device(String device) {
builder.device = Objects.requireNonNull(device);
builder.keyValueMethod = DEVICE;
return this;
}
@Override
public IStartTime parameter(String parameter) {
String[] split = Objects.requireNonNull(parameter).split("/");
if (split.length != 2) {
throw new IllegalArgumentException("Illegal parameter name, expected <device>/<property> got " + parameter);
}
builder.device = split[0];
builder.property = split[1];
builder.keyValueMethod = PARAMETER;
return this;
}
@Override
public IKeyValue keyValue(String key, String value) {
if(builder.keyValue == null){
builder.keyValue = new HashMap<>();
}
builder.keyValue.put(Objects.requireNonNull(key), Objects.requireNonNull(value));
builder.keyValueMethod = KEY;
return this;
}
@Override
public IStartTime property(String property) {
builder.property = Objects.requireNonNull(property);
return this;
}
@Override
public IEndTime startTime(Instant startTime) {
builder.startTime = String.valueOf(TimeUtils.getNanosFromInstant(Objects.requireNonNull(startTime)));
return this;
}
@Override
public IBuild endTime(Instant endTime) {
builder.endTime = String.valueOf(TimeUtils.getNanosFromInstant(Objects.requireNonNull(endTime)));
return this;
}
@Override
public IBuild fields(String... fields) {
builder.fields = Arrays.toString(Objects.requireNonNull(fields));
return this;
}
@Override
public Map<String, String> buildCatalog() {
Map<String, String> catalog = new HashMap<>();
catalog.put(SYSTEM.getValue(), builder.system);
switch (builder.keyValueMethod) {
case DEVICE:
case PARAMETER:
catalog.put(DEVICE.getValue(), builder.device);
catalog.put(PROPERTY.getValue(), builder.property);
break;
case KEY:
builder.keyValue.entrySet().forEach(e -> {
catalog.put(e.getKey(), e.getValue());
});
break;
}
catalog.put(START_TIME.getValue(), builder.startTime);
catalog.put(END_TIME.getValue(), builder.endTime);
if (builder.fields != null) {
catalog.put(FIELDS.getValue(), builder.fields);
}
return catalog;
}
}
}
/**
* Copyright (c) 2017 European Organisation for Nuclear Research (CERN), All Rights Reserved.
*/
package cern.accsoft.nxcals.data.access.builders;
import cern.accsoft.nxcals.common.utils.TimeUtils;
import cern.accsoft.nxcals.data.access.api.DataAccessService;
import cern.accsoft.nxcals.data.access.api.DataAccessServiceFactory;
import cern.accsoft.nxcals.data.access.api.QueryData;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.InsertableRelation;
import org.apache.spark.sql.sources.PrunedFilteredScan;
import org.apache.spark.sql.types.StructType;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Created by ntsvetko on 1/23/17.
*/
public class DataAccessRelation extends BaseRelation implements InsertableRelation, PrunedFilteredScan {
private final SQLContext sqlContext;
private final Dataset<Row> resultSet;
public DataAccessRelation(SQLContext sqlContext, Map<String, String> catalog) {
this.sqlContext = sqlContext;
DataAccessService dataAccessService = DataAccessServiceFactory.createDataAccessService(sqlContext.sparkSession());
this.resultSet = dataAccessService.create(this.loadQueryDataFrom(catalog), this.loadIntervalTimeFor(DataAccessCatalog.START_TIME,catalog),
this.loadIntervalTimeFor(DataAccessCatalog.END_TIME, catalog), this.loadRequestedFieldsFrom(catalog));
}
@Override
public SQLContext sqlContext() {
return this.sqlContext;
}
@Override
public StructType schema() {
return this.resultSet.schema();
}
@Override
public void insert(Dataset<Row> dataset, boolean b) {
//TODO: to be implemented for writing data back using the DataAccess format
}
@Override
public RDD<Row> buildScan(String[] strings, Filter[] filters) {
System.err.println("buildScan ----------------->");
Arrays.stream(filters).forEach(f -> {
System.err.println("filter -> " + f);
});
Arrays.stream(strings).forEach(s -> {
System.err.println("string -> " + s);
});
System.err.println(" <----------------------buildScan ");
Dataset<Row> filteredSet = this.resultSet.selectExpr(strings);
return this.resultSet.rdd();
}
private Instant loadIntervalTimeFor(DataAccessCatalog timeField, Map<String, String> catalog) {
String timeFieldValue = catalog.get(timeField.getValue());
if (timeFieldValue != null) {
try {
return TimeUtils.getInstantFromNanos(Long.valueOf(timeFieldValue));
} catch (NumberFormatException e) {
throw new NumberFormatException("Cannot cast field " + timeField + " to Long! value=" + timeFieldValue);
}
} else {
throw new RuntimeException("The NXCALS DataAccess format requeres time window (START_TIME, END_TIME keys) to be declared in the catalog!");
}
}
private QueryData loadQueryDataFrom(Map<String, String> catalog) {
QueryData queryData = null;
if (catalog.get(DataAccessCatalog.SYSTEM.getValue()) == null) {
throw new RuntimeException("The NXCALS DataAccess format requeres SYSTEM key to be declared in the catalog!");
}
DevicePropertyBuilder devicePropertyBuilder = new KeyValueBuilderFactory()
.createDevicePropertyBuilder(catalog.get(DataAccessCatalog.SYSTEM.getValue()));
if (catalog.get(DataAccessCatalog.DEVICE.getValue()) != null && catalog.get(DataAccessCatalog.PROPERTY.getValue()) != null) {
queryData = devicePropertyBuilder.device(catalog.get(DataAccessCatalog.DEVICE.getValue()))
.property(catalog.get(DataAccessCatalog.PROPERTY.getValue())).build();
}//TODO: check for some other provided keys like ELEMENT etc.
else {
throw new RuntimeException("The NXCALS DataAccess format requeres time window (START_TIME, END_TIME keys) to be declared in the catalog!");
}
return queryData;
}
private String[] loadRequestedFieldsFrom(Map<String, String> catalog) {
if (catalog.get(DataAccessCatalog.FIELDS.getValue()) != null) {
return Arrays.asList(catalog.get(DataAccessCatalog.FIELDS.getValue())).toArray(new String[0]);
}
return new String[0];
}
}
/**
* Copyright (c) 2017 European Organisation for Nuclear Research (CERN), All Rights Reserved.
*/
package cern.accsoft.nxcals.data.access.builders;
import org.apache.spark.sql.*;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.sources.RelationProvider;
import scala.collection.immutable.Map;
import scala.collection.JavaConversions;
import java.util.HashMap;
/**
* Created by ntsvetko on 1/23/17.
*/
public class DefaultSource implements RelationProvider, CreatableRelationProvider {
@Override
public BaseRelation createRelation(SQLContext sqlContext, Map<String, String> map) {
java.util.Map<String,String> catalog = new HashMap<>();
catalog = JavaConversions.mapAsJavaMap(map);
return new DataAccessRelation(sqlContext, catalog);
}
@Override
public BaseRelation createRelation(SQLContext sqlContext, SaveMode saveMode, Map<String, String> map, Dataset<Row> dataset) {
return null;
}
}
package cern.accsoft.nxcals.data.access.builders;
import org.junit.Test;
import java.time.Instant;
import java.util.Map;
/**
* Created by ntsvetko on 1/24/17.
*/
public class DataAccessCatalogBuilderTest {
@Test
public void shouldCreateDevicePropertyCatalog() {
Map<String, String> catalog = new DataAccessCatalogBuilder().system("CMW").device("LARGER").property("timerMarkerBct4")
.startTime(Instant.now()).endTime(Instant.now()).buildCatalog();
catalog.entrySet().forEach(e -> {
System.err.println(e.getKey() + " -> " + e.getValue());
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment