KafkaConfig.java

package no.nav.data.common.kafka;

import io.micrometer.core.instrument.MeterRegistry;
import no.nav.data.team.resource.NomClient;
import no.nav.data.team.resource.NomGraphClient;
import no.nav.data.team.resource.NomListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.MicrometerConsumerListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import java.time.Duration;

@Configuration
public class KafkaConfig {

    @Value("${kafka.topics.nom-ressurs}")
    private String topic;

    @Bean
    public ConsumerFactory<String, String> nomRessursConsumer(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
        var consumerFactory = new DefaultKafkaConsumerFactory<String, String>(kafkaProperties.buildConsumerProperties());
        consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

        return consumerFactory;
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> nomRessursContainer(
            ConsumerFactory<String, String> consumerFactory, NomClient nomClient, NomGraphClient nomGraphClient) {
        var containerProps = new ContainerProperties(topic);
        containerProps.setMessageListener(new NomListener(nomClient, nomGraphClient));
        containerProps.setAckMode(AckMode.MANUAL);
        containerProps.setPollTimeout(500);

        var container = new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
        container.setCommonErrorHandler(new KafkaErrorHandler());
        container.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofMinutes(5));

        return container;
    }
}