Skip to content
Snippets Groups Projects

NXCALS-7818 Kafka reschedules debug info

Merged Jakub Wozniak requested to merge NXCALS-7818-KafkaEtlReschedules into develop
2 files
+ 13
9
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -59,7 +59,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
private String getMemberId() {
try {
if (memberId == null) {
if (memberId == null || memberId.isEmpty()) {
memberId = consumer.groupMetadata().memberId();
}
}catch(InvalidGroupIdException ex) {
@@ -69,7 +69,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
}
private String getId() {
return "runner-" + runnerId + "(" + getMemberId() + ")";
return "runner-" + runnerId + " (" + getMemberId() + ")";
}
@SuppressWarnings("squid:S1764") //1==1 fails in Sonar while "while true" fails in IntelliJ...
@@ -151,8 +151,8 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
TotalLag result = getTotalLag(assignments);
if (result.total > props.getFlowControlThreshold()) {
log.warn(
"runner-{}, activating flow control as total lag {} greater than limit {}, active partition {} only with lag {}",
runnerId, result.total, props.getFlowControlThreshold(), result.maxLagPartition,
"{} Activating flow control as total lag {} greater than limit {}, active partition {} only with lag {}",
getId(), result.total, props.getFlowControlThreshold(), result.maxLagPartition,
result.maxLagForPartition);
consumer.pause(assignments);
consumer.resume(Lists.newArrayList(result.maxLagPartition));
@@ -161,14 +161,14 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
} else {
Set<TopicPartition> paused = consumer.paused();
log.info(
"runner-{}, flow control not needed, total lag {} less than limit {}, previously paused {} partitions, resuming all if necessary",
runnerId, result.total, props.getFlowControlThreshold(), paused);
"{} Flow control not needed, total lag {} less than limit {}, previously paused {} partitions, resuming all if necessary",
getId(), result.total, props.getFlowControlThreshold(), paused);
if (!paused.isEmpty()) {
consumer.resume(paused);
}
}
} else {
log.info("runner-{}, flow control not needed, only 1 partition assigned {}", runnerId, assignments);
log.info("{} Flow control not needed, only 1 partition assigned {}", getId(), assignments);
}
}
@@ -180,7 +180,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
}
private @NotNull TotalLag getTotalLag(Set<TopicPartition> assignments) {
log.info("runner-{}, flow control check for partitions {}", runnerId, assignments);
log.info("{} Flow control check for partitions {}", getId(), assignments);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
long totalLag = 0;
TopicPartition maxLagPartition = null;
@@ -189,7 +189,7 @@ public class KafkaConsumerRunner<K, V> implements Runnable, ConsumerRebalanceLis
long currentOffset = consumer.position(topicPartition);
long endOffset = endOffsets.get(topicPartition);
long lag = endOffset - currentOffset;
log.info("runner-{}, lag: {} for partition {}, current offset: {} end offset: {}", runnerId, lag,
log.info("{} Lag: {} for partition {}, current offset: {} end offset: {}", getId(), lag,
topicPartition, currentOffset, endOffset);
if (lag > maxLagForPartition) {
maxLagPartition = topicPartition;
Loading