NomListener.java

  1. package no.nav.data.team.resource;

  2. import lombok.extern.slf4j.Slf4j;
  3. import no.nav.data.common.utils.JsonUtils;
  4. import no.nav.data.common.utils.StreamUtils;
  5. import no.nav.data.team.resource.dto.NomRessurs;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.common.TopicPartition;
  8. import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
  9. import org.springframework.kafka.listener.ConsumerSeekAware;
  10. import org.springframework.kafka.support.Acknowledgment;

  11. import java.util.ArrayList;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;

  15. @Slf4j
  16. public class NomListener implements ConsumerSeekAware, BatchAcknowledgingMessageListener<String, String> {

  17.     private final NomClient nomClient;

  18.     public NomListener(NomClient nomClient) {
  19.         this.nomClient = nomClient;
  20.     }

  21.     @Override
  22.     public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
  23.         assignments.keySet().forEach(p -> callback.seekToBeginning(p.topic(), p.partition()));
  24.     }

  25.     @Override
  26.     public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
  27.         try {
  28.             var resources = new ArrayList<NomRessurs>();
  29.             for (ConsumerRecord<String, String> record : data) {
  30.                 NomRessurs nomRessurs = JsonUtils.toObject(record.value(), NomRessurs.class);
  31.                 if (nomRessurs.getNavident() == null) {
  32.                     log.warn("ressurs missing ident {}", nomRessurs);
  33.                 } else {
  34.                     resources.add(nomRessurs.addKafkaData(record.partition(), record.offset()));
  35.                 }
  36.             }
  37.             {
  38.                 // temporary diagnostics logging
  39.                 var offsets = resources.stream().map(NomRessurs::getOffset).toList();
  40.                 var entriesPerNavident = new HashMap<String,Integer>(resources.size()/2);
  41.                 for(var r : resources){
  42.                     var prev = entriesPerNavident.get(r.getNavident());
  43.                     entriesPerNavident.put(r.getNavident(), prev == null ? 1 : prev + 1);
  44.                 }
  45.                 var counts = StreamUtils.distinctByKey(entriesPerNavident.values(),it -> it);

  46.                 var inOrder = true;
  47.                 for(var i = 1; i < offsets.size(); i += 1){
  48.                     inOrder &= offsets.get(i-1) < offsets.get(i);
  49.                 }
  50.                 log.info("Kafka messages count: {}", data.size());
  51.                 log.info("Resources are ordered by offset? -> {}", inOrder );
  52.                 log.info("Distinct duplicate amounts in resources from kafka -> {}", counts);
  53.             }
  54.             nomClient.add(resources);
  55.         } catch (Exception e) {
  56.             log.error("Failed to write nom ressurs", e);
  57.             throw e;
  58.         }
  59.         acknowledgment.acknowledge();
  60.     }

  61. }