Commit 0ed5fe9e authored by wjurasz's avatar wjurasz
Browse files

Started with some basic flume hbase integration

parent 88d8028a
......@@ -14,10 +14,14 @@ dependencies {
compile project(':accsoft-nxcals-common')
compile project(':accsoft-nxcals-service-client')
// compile group: 'org.apache.flume', name: 'flume-ng-core', version: flumeVersion
compile group: 'org.apache.flume.flume-ng-sinks', name: 'flume-ng-hbase-sink', version: '1.7.0'
// compile group: 'org.apache.flume', name: 'flume-ng-node', version: flumeVersion
compile group: 'org.apache.flume', name: 'flume-ng-dist', version: flumeVersion
// compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.1'
compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.1'
compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.1'
compile group: 'org.apache.hadoop', name: 'hadoop-client', version: hadoopVersion
compile group: 'com.google.guava', name: 'guava', version: guavaVersion
......
package cern.accsoft.nxcals.hbase;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.List;
/**
* Created by wjurasz on 28.03.17.
*/
public class HbaseAsyncSerializer implements AsyncHbaseEventSerializer {
@Override
public void initialize(byte[] table, byte[] cf) {
}
@Override
public void setEvent(Event event) {
}
@Override
public List<PutRequest> getActions() {
return null;
}
@Override
public List<AtomicIncrementRequest> getIncrements() {
return null;
}
@Override
public void cleanUp() {
}
@Override
public void configure(Context context) {
}
@Override
public void configure(ComponentConfiguration conf) {
}
}
test_agent.sources = kafka-source
test_agent.channels = mem-channel1
test_agent.sinks = hbase-sink
test_agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
test_agent.sources.kafka-source.zookeeperConnect = nxcals-wjurasz3:2181
test_agent.sources.kafka-source.topic = CMW
test_agent.sources.kafka-source.groupId = flume
test_agent.sources.kafka-source.channels = mem-channel1
test_agent.sources.kafka-source.interceptors = i1
test_agent.sources.kafka-source.interceptors.i1.type = cern.accsoft.nxcals.flume.RecordInterceptor
test_agent.sources.kafka-source.kafka.consumer.timeout.ms = 100
test_agent.channels.mem-channel1.type = memory
test_agent.channels.mem-channel1.capacity = 10000
test_agent.channels.chanmem-channel1nel1.transactionCapacity = 1000
test_agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
test_agent.sinks.hbase-sink.channel = mem-channel1
test_agent.sinks.hbase-sink.table = transactions
test_agent.sinks.hbase-sink.columnFamily = data
test_agent.sinks.hbase-sink.column = charges
test_agent.sinks.hbase-sink.batchSize = 5000
test_agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
test_agent.sinks.hbase-sink.serializer.incrementColumn = icol
test_agent.channels.mem-channel1.type=memory
......@@ -13,7 +13,7 @@ public class ApplicationDev {
}
public static void main(String[] args) {
try {
String[] devArgs = new String[] {"-n", "nxcals-flume-dev-agent", /*"--conf", "./src/test/resources/config",*/ "--conf-file", "out/test/accsoft-nxcals-flume/flume-conf.properties"};
String[] devArgs = new String[] {"-n", "nxcals-flume-dev-agent", /*"--conf", "./src/test/resources/config",*/ "--conf-file", "out/test/accsoft-nxcals-flume_test/flume-conf.properties"};
org.apache.flume.node.Application.main(devArgs);
} catch (Exception e) {
e.printStackTrace();
......
# Name the components on this agent
nxcals-flume-dev-agent.sources = r1
nxcals-flume-dev-agent.sinks = k1
nxcals-flume-dev-agent.channels = c1
nxcals-flume-dev-agent.sinks = k1 hbase-sink
nxcals-flume-dev-agent.channels = c1 hbase-channel
# Kafka source
nxcals-flume-dev-agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
nxcals-flume-dev-agent.sources.r1.kafka.bootstrap.servers = nxcals-jwozniak3:9092,nxcals-jwozniak4:9092,nxcals-jwozniak5:9092
nxcals-flume-dev-agent.sources.r1.kafka.bootstrap.servers = nxcals-wjurasz3:9092,nxcals-wjurasz4:9092,nxcals-wjurasz5:9092
nxcals-flume-dev-agent.sources.r1.kafka.topics.regex = ^CMW$
nxcals-flume-dev-agent.sources.r1.batchSize = 1000
nxcals-flume-dev-agent.sources.r1.batchDurationMillis = 1000
......@@ -15,7 +15,7 @@ nxcals-flume-dev-agent.sources.r1.interceptors.i1.type =cern.accsoft.nxcals.flum
# HDFS sink
nxcals-flume-dev-agent.sinks.k1.type = hdfs
nxcals-flume-dev-agent.sinks.k1.hdfs.path = hdfs://lxbrf39c04.cern.ch/project/nxcals/nxcals_dev_jwozniak/flume-dev/%{nxcals_system_id}/%{nxcals_partition_id}/%{nxcals_schema_id}/%{nxcals_date}
nxcals-flume-dev-agent.sinks.k1.hdfs.path = hdfs://lxbrf39c04.cern.ch/project/nxcals/nxcals_dev_wjurasz/flume-dev/%{nxcals_system_id}/%{nxcals_partition_id}/%{nxcals_schema_id}/%{nxcals_date}
nxcals-flume-dev-agent.sinks.k1.hdfs.filePrefix=data
#hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
......@@ -40,13 +40,29 @@ nxcals-flume-dev-agent.sinks.k1.serializer=cern.accsoft.nxcals.flume.hdfs.AvroEv
nxcals-flume-dev-agent.sinks.k1.serializer.compressionCodec=snappy
#nxcals-flume-dev-agent.sinks.k1.serializer=avro_event
nxcals-flume-dev-agent.sinks.k1.hdfs.kerberosPrincipal=acclog@CERN.CH
nxcals-flume-dev-agent.sinks.k1.hdfs.kerberosKeytab=/opt/jwozniak/.keytab-acclog
nxcals-flume-dev-agent.sinks.k1.hdfs.kerberosKeytab=/opt/wjurasz/acclog.keytab
# HBase sink
nxcals-flume-dev-agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
nxcals-flume-dev-agent.sinks.hbase-sink.table = transactions
nxcals-flume-dev-agent.sinks.hbase-sink.columnFamily = data
nxcals-flume-dev-agent.sinks.hbase-sink.column = charges
nxcals-flume-dev-agent.sinks.hbase-sink.batchSize = 5000
nxcals-flume-dev-agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
nxcals-flume-dev-agent.sinks.hbase-sink.serializer.incrementColumn = icol
# Use a channel which buffers events in memory
nxcals-flume-dev-agent.channels.c1.type = memory
nxcals-flume-dev-agent.channels.c1.capacity = 400000
nxcals-flume-dev-agent.channels.c1.transactionCapacity = 400000
nxcals-flume-dev-agent.channels.hbase-channel.type = memory
nxcals-flume-dev-agent.channels.hbase-channel.capacity = 400000
nxcals-flume-dev-agent.channels.hbase-channel.transactionCapacity = 400000
# Bind the source and sink to the channel
nxcals-flume-dev-agent.sources.r1.channels = c1
nxcals-flume-dev-agent.sources.r1.channels = c1 hbase-channel
nxcals-flume-dev-agent.sinks.k1.channel = c1
nxcals-flume-dev-agent.sinks.hbase-sink.channel = hbase-channel
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment