KafkaErrorHandler.java

  1. package no.nav.data.common.kafka;

  2. import jakarta.annotation.Nonnull;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.Consumer;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.springframework.core.task.SimpleAsyncTaskExecutor;
  7. import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
  8. import org.springframework.kafka.listener.MessageListenerContainer;
  9. import org.springframework.stereotype.Component;

  10. import java.time.Duration;
  11. import java.util.List;
  12. import java.util.Optional;
  13. import java.util.concurrent.Executor;
  14. import java.util.concurrent.atomic.AtomicInteger;
  15. import java.util.concurrent.atomic.AtomicLong;

  16. @Slf4j
  17. @Component
  18. public class KafkaErrorHandler extends CommonContainerStoppingErrorHandler {

  19.     private final Executor executor;

  20.     private final AtomicInteger counter = new AtomicInteger(0);
  21.     private final AtomicLong lastError = new AtomicLong(0);

  22.     private static final long LONG = Duration.ofHours(3).toMillis();
  23.     private static final long SHORT = Duration.ofSeconds(20).toMillis();

  24.     private static final int SLOW_ERROR_COUNT = 10;
  25.     private static final long COUNTER_RESET_TIME = SHORT * SLOW_ERROR_COUNT * 2;

  26.     public KafkaErrorHandler() {
  27.         this.executor = new SimpleAsyncTaskExecutor();
  28.     }

  29.     @Override
  30.     public void handleRemaining(@Nonnull Exception thrownException, List<ConsumerRecord<?, ?>> records, @Nonnull Consumer<?, ?> consumer, @Nonnull MessageListenerContainer container) {
  31.         var record = records.iterator().hasNext() ? records.iterator().next() : null;
  32.         Optional.ofNullable(record)
  33.                 .map(ConsumerRecord::topic)
  34.                 .ifPresent(topic -> scheduleRestart(thrownException, records, consumer, container, topic));
  35.     }

  36.     @SuppressWarnings({"pmd:DoNotUseThreads", "fb-contrib:SEC_SIDE_EFFECT_CONSTRUCTOR"})
  37.     private void scheduleRestart(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container, String topic) {
  38.         long now = System.currentTimeMillis();
  39.         if (now - lastError.getAndSet(now) > COUNTER_RESET_TIME) {
  40.             counter.set(0);
  41.         }
  42.         int numErrors = counter.incrementAndGet();
  43.         long stopTime = numErrors > SLOW_ERROR_COUNT ? LONG : SHORT * numErrors;

  44.         executor.execute(() -> {
  45.             try {
  46.                 Thread.sleep(stopTime);
  47.                 log.warn("Starting kafka container topic={}", topic);
  48.                 container.start();
  49.             } catch (Exception exception) {
  50.                 log.error("Error starting kafka container", exception);
  51.             }
  52.         });

  53.         log.warn("Stopping kafka container topic={} for {}", topic, Duration.ofMillis(stopTime).toString());
  54.         super.handleRemaining(thrownException, records, consumer, container);
  55.     }

  56. }