KafkaConfig.java

  1. package no.nav.data.common.kafka;

  2. import io.micrometer.core.instrument.MeterRegistry;
  3. import no.nav.data.team.resource.NomClient;
  4. import no.nav.data.team.resource.NomListener;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.core.ConsumerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  11. import org.springframework.kafka.core.MicrometerConsumerListener;
  12. import org.springframework.kafka.listener.ContainerProperties;
  13. import org.springframework.kafka.listener.ContainerProperties.AckMode;
  14. import org.springframework.kafka.listener.KafkaMessageListenerContainer;

  15. import java.time.Duration;

  16. @Configuration
  17. public class KafkaConfig {

  18.     @Value("${kafka.topics.nom-ressurs}")
  19.     private String topic;

  20.     @Bean
  21.     public ConsumerFactory<String, String> nomRessursConsumer(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
  22.         var consumerFactory = new DefaultKafkaConsumerFactory<String, String>(kafkaProperties.buildConsumerProperties());
  23.         consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

  24.         return consumerFactory;
  25.     }

  26.     @Bean
  27.     public KafkaMessageListenerContainer<String, String> nomRessursContainer(
  28.             ConsumerFactory<String, String> consumerFactory, NomClient nomClient) {
  29.         var containerProps = new ContainerProperties(topic);
  30.         containerProps.setMessageListener(new NomListener(nomClient));
  31.         containerProps.setAckMode(AckMode.MANUAL);
  32.         containerProps.setPollTimeout(500);

  33.         var container = new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
  34.         container.setCommonErrorHandler(new KafkaErrorHandler());
  35.         container.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMinutes(5));

  36.         return container;
  37.     }
  38. }