KafkaErrorHandler.java
package no.nav.data.common.kafka;
import jakarta.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Component
public class KafkaErrorHandler extends CommonContainerStoppingErrorHandler {
private final Executor executor;
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicLong lastError = new AtomicLong(0);
private static final long LONG = Duration.ofHours(3).toMillis();
private static final long SHORT = Duration.ofSeconds(20).toMillis();
private static final int SLOW_ERROR_COUNT = 10;
private static final long COUNTER_RESET_TIME = SHORT * SLOW_ERROR_COUNT * 2;
public KafkaErrorHandler() {
this.executor = new SimpleAsyncTaskExecutor();
}
@Override
public void handleRemaining(@Nonnull Exception thrownException, List<ConsumerRecord<?, ?>> records, @Nonnull Consumer<?, ?> consumer, @Nonnull MessageListenerContainer container) {
var record = records.iterator().hasNext() ? records.iterator().next() : null;
Optional.ofNullable(record)
.map(ConsumerRecord::topic)
.ifPresent(topic -> scheduleRestart(thrownException, records, consumer, container, topic));
}
@SuppressWarnings({"pmd:DoNotUseThreads", "fb-contrib:SEC_SIDE_EFFECT_CONSTRUCTOR"})
private void scheduleRestart(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container, String topic) {
long now = System.currentTimeMillis();
if (now - lastError.getAndSet(now) > COUNTER_RESET_TIME) {
counter.set(0);
}
int numErrors = counter.incrementAndGet();
long stopTime = numErrors > SLOW_ERROR_COUNT ? LONG : SHORT * numErrors;
executor.execute(() -> {
try {
Thread.sleep(stopTime);
log.warn("Starting kafka container topic={}", topic);
container.start();
} catch (Exception exception) {
log.error("Error starting kafka container", exception);
}
});
log.warn("Stopping kafka container topic={} for {}", topic, Duration.ofMillis(stopTime).toString());
super.handleRemaining(thrownException, records, consumer, container);
}
}