Skip to content
Snippets Groups Projects
Commit 0dcd2869 authored by Jakub Wozniak's avatar Jakub Wozniak
Browse files

Merge branch 'NXCALS-7753-Introduce-disaster-flow-control-in-ETLs' into 'develop'

Resolve NXCALS-7753 "Introduce disaster flow control in etls"

Closes NXCALS-7753

See merge request !2039

Changelog: added
parents 4f76d61b 3059001c
No related branches found
No related tags found
1 merge request!2039Resolve NXCALS-7753 "Introduce disaster flow control in etls"
Showing with 195 additions and 27 deletions
......@@ -539,6 +539,7 @@ kafka_etl_daily_partitions: '{{service_compaction_staging_partition_splits}}'
kafka_etl_time_bucket_since: "2024-06-13 00:00:00.000"
#For alerting
kafka_etl_max_lag: 100000
kafka_etl_flow_control_threshold: 100000000
#NXCALS - Kafka-Etl Hdfs
#################################################################################
......
......@@ -14,6 +14,7 @@ nxcals.kafka.etl:
long-write-threshold: 30s
daily-partitions: {{kafka_etl_daily_partitions}}
time-bucket-since: {{kafka_etl_time_bucket_since}}
flow-control-threshold: {{kafka_etl_flow_control_threshold}}
consumer-properties:
bootstrap.servers: {{kafka_etl_bootstrap_servers}}
group.id: {{kafka_etl_consumer_group_id}}-{{store}}
......
......@@ -10,9 +10,14 @@ import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
@Slf4j
public class Application {
static {
Thread.setDefaultUncaughtExceptionHandler((thread, ex) ->
log.error("Uncaught exception in thread {}", thread, ex)
);
}
public static void main(String[] args) throws Exception {
try {
SpringApplication app = new SpringApplication(Application.class);
app.addListeners(new ApplicationPidFileWriter());
ConfigurableApplicationContext context = app.run();
......
......@@ -51,6 +51,8 @@ public class KafkaEtlProperties {
//The new time bucketing will be since that date, to be set in the config.
private String timeBucketSince;
//A maximum total lag from which flow control will be enabled
private long flowControlThreshold;
@Data
@NoArgsConstructor
public static class RetryProperties {
......
......@@ -61,6 +61,7 @@ public class HdfsFileWriter implements Closeable {
}
private synchronized void open() {
long start = System.currentTimeMillis();
verifyNonTerminated();
fileName = generateFileName();
......@@ -68,7 +69,7 @@ public class HdfsFileWriter implements Closeable {
try {
fileSystem.mkdirs(new Path(basePath.toString()));
file = fileSystem.create(fullPath, true, this.fileBufferSize);
log.debug("Created new file {}", fullPath);
log.debug("Created new file {} in {} ms", fullPath, System.currentTimeMillis() - start);
streamWriter = newStreamWriter();
} catch (IOException e) {
throw new UncheckedIOException("Cannot open file " + fullPath, e);
......@@ -83,6 +84,7 @@ public class HdfsFileWriter implements Closeable {
@Override
public synchronized void close() {
if (!isClosed() && !isTerminated()) {
long start = System.currentTimeMillis();
log.debug("Closing file {}/{}", basePath, fileName);
try {
streamWriter.close();
......@@ -91,6 +93,7 @@ public class HdfsFileWriter implements Closeable {
log.error("Cannot close file {}/{}", basePath, fileName, e);
throw new IllegalStateException(e);
} finally {
log.trace("Closing file {}/{} took {} ms", basePath, fileName, (System.currentTimeMillis() - start));
streamWriter = null;
file = null;
}
......
......@@ -10,6 +10,7 @@ import cern.nxcals.kafka.etl.service.Store;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -55,7 +56,7 @@ public class HdfsStore extends AbstractStore implements Store<byte[], byte[]> {
}
@SuppressWarnings({"squid:S2445", "squid:S1172"}) //sync on writer and unused parameter
private void onRemoval(PartitionInfo key, HdfsFileWriter writer, RemovalCause removalCause) {
private void onRemoval(PartitionInfo key, @NonNull HdfsFileWriter writer, RemovalCause removalCause) {
synchronized (writer) {
log.debug("Cache eviction - removing writer ({}) with cause {}", writer.getFullPath(), removalCause);
writer.terminate();
......@@ -82,6 +83,7 @@ public class HdfsStore extends AbstractStore implements Store<byte[], byte[]> {
if (writer != null) {
try {
writer.close();
writers.invalidate(partitionInfo); //removal from the cache.
} catch (Exception e) {
log.error("Error during flushing writer for partition {}", partitionInfo, e);
return 1;
......
......@@ -21,6 +21,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
......@@ -46,8 +48,8 @@ public class DefaultProcessor<K, V> implements Processor<K, V> {
private final ExecutorService storeExecutor;
private final MetricsRegistry metricsRegistry;
private final RetryTemplate retryTemplate;
private Set<PartitionInfo> usedPartitions = new HashSet<>();
private final Set<PartitionInfo> usedPartitions = new HashSet<>();
private final ForkJoinPool forkJoinPool = new ForkJoinPool(20);
private void processTerminated(RetryContext retryContext, WriterTerminatedException ex) {
log.warn("Looks like we are stopping the app, not repeating", ex);
......@@ -59,7 +61,7 @@ public class DefaultProcessor<K, V> implements Processor<K, V> {
retryTemplate.execute(retryContext -> {
try {
flushImpl();
} catch(WriterTerminatedException ex) {
} catch (WriterTerminatedException ex) {
processTerminated(retryContext, ex);
throw ex;
} catch (Exception ex) {
......@@ -75,35 +77,40 @@ public class DefaultProcessor<K, V> implements Processor<K, V> {
retryTemplate.execute(retryContext -> {
try {
processImpl(data);
} catch(WriterTerminatedException ex) {
} catch (WriterTerminatedException ex) {
processTerminated(retryContext, ex);
throw ex;
} catch (Exception ex) {
processException(retryContext, ex, "Processing records failed with exception, will be retried, retry count {}");
processException(retryContext, ex,
"Processing records failed with exception, will be retried, retry count {}");
throw ex;
}
return null;
});
}
private void processException(RetryContext retryContext, Exception ex, String text) {
private void processException(RetryContext retryContext, Exception ex, String text) {
metricsRegistry.incrementCounterBy(PROCESS_ERRORS_COUNT, 1);
log.warn(text, retryContext.getRetryCount(), ex);
}
private void flushImpl() {
try {
long time = System.currentTimeMillis();
log.debug("Processor={}, flushing stores with {} used NXCALS partitions", processorId, usedPartitions.size());
for (Store<K, V> store : stores) {
long time = System.currentTimeMillis();
log.debug("Processor={}, flushing stores with {} used NXCALS partitions", processorId, usedPartitions.size());
forkJoinPool.submit(() -> stores.parallelStream().forEach(store -> {
try {
store.flush(usedPartitions);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
usedPartitions.clear();
log.debug("Processor={}, flushing stores finished, took {} ms", processorId,
System.currentTimeMillis() - time);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})).join();
usedPartitions.clear();
log.debug("Processor={}, flushing stores finished, took {} ms", processorId,
System.currentTimeMillis() - time);
}
private void processImpl(ConsumerRecords<K, V> data) {
......@@ -145,12 +152,15 @@ public class DefaultProcessor<K, V> implements Processor<K, V> {
}
private Map<PartitionInfo, List<ConsumerRecord<K, V>>> groupByNxcalsPartition(List<ConsumerRecord<K, V>> data) {
Map<PartitionInfo, List<ConsumerRecord<K, V>>> partitionInfoListMap = data.parallelStream()
.collect(groupingBy(partitioner::findPartition));
if(partitionInfoListMap.containsKey(PartitionInfo.INVALID_PARTITION)) {
List<ConsumerRecord<K,V>> invalid = partitionInfoListMap.get(PartitionInfo.INVALID_PARTITION);
log.error("Invalid {} records in the data are removed, please check previous errors for more information", invalid.size());
ForkJoinTask<Map<PartitionInfo, List<ConsumerRecord<K, V>>>> task = forkJoinPool.submit(
() -> data.parallelStream()
.collect(groupingBy(partitioner::findPartition)));
Map<PartitionInfo, List<ConsumerRecord<K, V>>> partitionInfoListMap = task.join();
if (partitionInfoListMap.containsKey(PartitionInfo.INVALID_PARTITION)) {
List<ConsumerRecord<K, V>> invalid = partitionInfoListMap.get(PartitionInfo.INVALID_PARTITION);
log.error("Invalid {} records in the data are removed, please check previous errors for more information",
invalid.size());
partitionInfoListMap.remove(PartitionInfo.INVALID_PARTITION);
}
return partitionInfoListMap;
......@@ -177,7 +187,7 @@ public class DefaultProcessor<K, V> implements Processor<K, V> {
log.trace("Processor={}, processing store partition {} with {} records took {} ms for store {}",
processorId, partition, records.size(), (System.currentTimeMillis() - start), store);
}
} catch(WriterTerminatedException ex) {
} catch (WriterTerminatedException ex) {
throw ex;
} catch (Exception e) {
log.error("Cannot process records {} for partition {} and store {}", records.size(), partition, store);
......
package cern.nxcals.kafka.etl.service;
import cern.nxcals.kafka.etl.config.KafkaEtlProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
......@@ -14,6 +16,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.io.IOException;
......@@ -52,9 +55,9 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
private long totalRecords;
private long totalTime;
@SuppressWarnings("squid:S1764") //1==1 fails in Sonar while "while true" fails in IntelliJ...
public void run() {
try {
log.debug("runner-{} start", runnerId);
......@@ -115,10 +118,70 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
synchronized (offsetHolder) {
if (offsetHolder.shouldCommitOffsets(props.getFlushInterval(),props.getFlushIntervalUnit())) {
commitOffsets();
flowControl();
}
}
}
/**
* Introduces "sticky" partitions in case of too much lag. Should help with the Hadoop cluster load.
*/
@VisibleForTesting
void flowControl() {
Set<TopicPartition> assignments = consumer.assignment();
if (assignments.size() > 1) {
TotalLag result = getTotalLag(assignments);
if (result.total > props.getFlowControlThreshold()) {
log.warn(
"runner-{}, activating flow control as total lag {} greater than limit {}, active partition {} only with lag {}",
runnerId, result.total, props.getFlowControlThreshold(), result.maxLagPartition,
result.maxLagForPartition);
consumer.pause(assignments);
consumer.resume(Lists.newArrayList(result.maxLagPartition));
Set<TopicPartition> paused = consumer.paused();
log.warn("runner-{}, paused {} partitions", runnerId, paused);
} else {
Set<TopicPartition> paused = consumer.paused();
log.info(
"runner-{}, flow control not needed, total lag {} less than limit {}, previously paused {} partitions, resuming all if necessary",
runnerId, result.total, props.getFlowControlThreshold(), paused);
if (!paused.isEmpty()) {
consumer.resume(paused);
}
}
} else {
log.info("runner-{}, flow control not needed, only 1 partition assigned {}", runnerId, assignments);
}
}
@RequiredArgsConstructor
private static class TotalLag {
public final long total;
public final TopicPartition maxLagPartition;
public final long maxLagForPartition;
}
private @NotNull TotalLag getTotalLag(Set<TopicPartition> assignments) {
log.info("runner-{}, flow control check for partitions {}", runnerId, assignments);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
long totalLag = 0;
TopicPartition maxLagPartition = null;
long maxLagForPartition = 0;
for (TopicPartition topicPartition : assignments) {
long currentOffset = consumer.position(topicPartition);
long endOffset = endOffsets.get(topicPartition);
long lag = endOffset - currentOffset;
log.info("runner-{}, lag: {} for partition {}, current offset: {} end offset: {}", runnerId, lag,
topicPartition, currentOffset, endOffset);
if (lag > maxLagForPartition) {
maxLagPartition = topicPartition;
maxLagForPartition = lag;
}
totalLag += lag;
}
return new TotalLag(totalLag, maxLagPartition, maxLagForPartition);
}
private void commitOffsets() {
log.debug("runner-{}, committing offsets", runnerId);
messageProcessor.flush();
......@@ -133,11 +196,13 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
//etl cluster from rebalancing as Kafka will wait for this method to finish before assigning
//the partitions again.
offsetHolder.clearOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.debug("Partitions {} assigned to runner-{}", partitions, runnerId);
flowControl();
}
// Shutdown hook which can be called from a separate thread
......
......@@ -12,12 +12,17 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -31,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.atLeastOnce;
......@@ -40,6 +47,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class KafkaConsumerRunnerTest {
private static final Long OFFSET1 = 1L;
......@@ -180,6 +188,75 @@ public class KafkaConsumerRunnerTest {
verify(kafkaConsumer, times(1)).close();
}
@Test
public void shouldStartControlFlow() {
//given
KafkaConsumerRunner<byte[], byte[]> runner = createRunner(Integer.MAX_VALUE, TimeUnit.MINUTES);
Set<TopicPartition> tps = new HashSet<>();
tps.add(tp1);
tps.add(tp2);
when(kafkaConsumer.assignment()).thenReturn(tps);
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 10000L);
offsets.put(tp2, 20000L);
when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(offsets);
when(kafkaConsumer.position(tp1)).thenReturn(1000L);
when(kafkaConsumer.position(tp2)).thenReturn(1000L);
//when
runner.flowControl();
//then
ArgumentCaptor<Collection<TopicPartition>> pause = ArgumentCaptor.forClass(Collection.class);
verify(kafkaConsumer, times(1)).pause(pause.capture());
assertEquals(2, pause.getValue().size());
assertTrue(pause.getValue().contains(tp1));
assertTrue(pause.getValue().contains(tp2));
ArgumentCaptor<Collection<TopicPartition>> resume = ArgumentCaptor.forClass(Collection.class);
verify(kafkaConsumer, times(1)).resume(resume.capture());
assertEquals(1, resume.getValue().size());
assertTrue(resume.getValue().contains(tp2));
}
@Test
public void shouldStopControlFlow() {
//given
KafkaConsumerRunner<byte[], byte[]> runner = createRunner(Integer.MAX_VALUE, TimeUnit.MINUTES);
Set<TopicPartition> tps = new HashSet<>();
tps.add(tp1);
tps.add(tp2);
when(kafkaConsumer.assignment()).thenReturn(tps);
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 10000L);
offsets.put(tp2, 20000L);
when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(offsets);
when(kafkaConsumer.position(tp1)).thenReturn(9999L);
when(kafkaConsumer.position(tp2)).thenReturn(19999L);
when(kafkaConsumer.paused()).thenReturn(tps);
//when
runner.flowControl();
//then
ArgumentCaptor<Collection<TopicPartition>> resume = ArgumentCaptor.forClass(Collection.class);
verify(kafkaConsumer, times(1)).resume(resume.capture());
assertEquals(2, resume.getValue().size());
assertTrue(resume.getValue().contains(tp1));
assertTrue(resume.getValue().contains(tp2));
}
}
@Test
......@@ -195,6 +272,7 @@ public class KafkaConsumerRunnerTest {
}
private void run(KafkaConsumerRunner<byte[], byte[]> runner) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(runner);
......@@ -205,6 +283,7 @@ public class KafkaConsumerRunnerTest {
props.setTopicPattern(".*");
props.setFlushInterval(flushInterval);
props.setFlushIntervalUnit(unit);
props.setFlowControlThreshold(1000L);
return new KafkaConsumerRunner<>(this.kafkaConsumer, this.messageProcessor, this.executor, props);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment