Skip to content
Snippets Groups Projects

Resolve NXCALS-7753 "Introduce disaster flow control in etls"

Merged Resolve NXCALS-7753 "Introduce disaster flow control in etls"
All threads resolved!
Merged Jakub Wozniak requested to merge NXCALS-7753-Introduce-disaster-flow-control-in-ETLs into develop
All threads resolved!
Files
2
@@ -131,10 +131,10 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
Set<TopicPartition> assignments = consumer.assignment();
if (assignments.size() > 1) {
TotalLag result = getTotalLag(assignments);
if (result.totalLag > props.getFlowControlThreshold()) {
if (result.total > props.getFlowControlThreshold()) {
log.warn(
"runner-{}, activating flow control as total lag {} greater than limit {}, active partition {} only with lag {}",
runnerId, result.totalLag, props.getFlowControlThreshold(), result.maxLagPartition,
runnerId, result.total, props.getFlowControlThreshold(), result.maxLagPartition,
result.maxLagForPartition);
consumer.pause(assignments);
consumer.resume(Lists.newArrayList(result.maxLagPartition));
@@ -144,7 +144,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
Set<TopicPartition> paused = consumer.paused();
log.info(
"runner-{}, flow control not needed, total lag {} less than limit {}, previously paused {} partitions, resuming all if necessary",
runnerId, result.totalLag, props.getFlowControlThreshold(), paused);
runnerId, result.total, props.getFlowControlThreshold(), paused);
if (!paused.isEmpty()) {
consumer.resume(paused);
}
@@ -156,7 +156,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
@RequiredArgsConstructor
private static class TotalLag {
public final long totalLag;
public final long total;
public final TopicPartition maxLagPartition;
public final long maxLagForPartition;
}
Loading