Bilde av PUS-logo

Kom i gang med Kafka

Av Stephen Ramthun 16 Oct 2018

Kafka er et navn du sikkert har hørt hvisket rundt omkring i gangene hos NAV. Det er ikke Franz det snakkes om, men Apache Kafka, en open-source strømmeplatform som for tiden er i vinden hos organisasjoner og bedrifter med store applikasjonsporteføljer hvor behovet for en smidig løsning for sanntidsutveksling av meldinger har vokst seg stort. Som hos f.eks. NAV.

Hva er Kafka?

Kort fortalt er Kafka en distribuert strømmeplatform som muliggjør effektiv meldingsutveksling mellom forskjellige applikasjoner og endepunkt ved en Producer/Consumer-arkitektur. Jeg skal ikke gå i dybden på hva Kafka er og hva du kan gjøre med det, så om du ikke har vært borti Kafka før anbefales det at du tar en titt på de offisielle sidene for prosjektet og Confluents dokumentasjon. Der finner du utfyllende informasjon om Kafkas konsepter og arkitektur samt detaljer for implementasjon av klienter. Last også ned Kafka og gå gjennom quickstart-seksjonen for å få en rask innføring i hvordan det hele fungerer i praksis.

Kafka på NAV

NAV vedlikeholder tre miljøer for Kafka: test, preprod og produksjon. Disse samsvarer med henholdsvis test.local (T), preprod.local (Q) og adeo.no (prod). For å produsere/konsumere topics må man ha en gyldig systembruker i det aktuelle miljøet og konfigurere topicen slik at systembrukeren har skrive og/eller leserettigheter på denne, avhengig av om man skal produsere eller konsumere. Det er opprettet et selvhjelps-API for administrering av topics og tilgangskontroll pr. miljø. Denne lever på følgende steder:

Oppretter du en topic i test vil denne kun eksistere i dette clusteret og du blir nødt til å opprette den igjen i preprod og prod etter behov. Se AURA og ATOM sine Kafka-sider for mer detaljer om hvordan Kafka er satt opp hos NAV.

Oppsett av lokalt testmiljø

Når du utvikler en applikasjon som skal kommunisere med Kafka-brokerne til NAV er det fint å kunne sette opp et lokalt testmiljø slik at du slipper å deploye hver gang du vil teste en endring. Det finnes en embedded Kafka-server man kan kjøre direkte i Java om man ønsker å teste Kafka kjapt, men for å teste applikasjonen din opp mot NAVs Kafka-brokere blir det straks litt mer komplisert. Det finnes heldigvis en enkel løsning på dette problemet. Dersom du kloner dette repoet og følger instruksene får du et lokalt Kafka-oppsett som er svært likt det som lever i de forskjellige miljøene på NAV og du administrerer topics og sikkerhetskonfigurasjon via REST-APIet på samme måte som du ville gjort det i et faktisk miljø. Du når denne på http://localhost:8840/api/v1/ når du har opprettet testmiljøet.

Oppsett av Producer/Consumer

Her finner du en eksempelapplikasjon for å se hvordan du kan sette opp både Producer og Consumer på en enkel måte i Java med Spring. Spring Kafka bruker annotasjonsmagi for å gjøre det lekende lett å sette opp Kafka-klienter. Nedenfor ser du kode fra eksempelapplikasjonen som viser implementasjon av både Producer og Consumer samt konfigurasjonen av disse. Merk at Confluence har klientbibliotek for flere språk om du skriver appen din i f.eks. Python eller Go.

// KafkaProducer.java

@Slf4j
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> template;

    @Value("${app.topic.test}")
    private String topic;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> template) {
        this.template = template;
    }

    public void send(String message) {
        log.info("Sending message: '{}' to topic: '{}'", message, topic);
        template.send(topic, message);
    }

}
// KafkaConsumer.java

@Slf4j
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "${app.topic.test}")
    public void listen(@Payload String message) {
        log.info("Received message: {}", message);
    }

}
// KafkaConfig.java

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties properties) {
        return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory(KafkaProperties properties) {
        return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory;
        factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        return factory;
    }

}

Konfigurasjonsparametrene settes her i src/main/resources/application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093
    properties:
      client:
        id: kafka-demo
      security.protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule \
          required username="igroup" \
          password="itest";
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: srvc01
      auto-offset-reset: earliest

app:
  topic:
    test: test-topic

Felter verdt å merke seg:

Dette er et forenklet eksempel på hvordan man kan konfigurere Kafka-klienten opp mot det lokale testmiljøet, men det er mange andre måter man også kan gjøre det på. Et raskt søk på github, noe à la. org:navikt kafka, viser hvordan andre har valgt å gjøre det. Det er helt opp til deg som utvikler å finne ut av hvordan du ønsker å løse dette. Det viktige er å passe på at du setter riktige parametre for de forskjellige miljøene applikasjonen din skal leve i, og at du husker å opprette topics via REST-APIet.

Hvordan konfigurere topics

Administrering og oppretting av topics i Kafka-clusteret kan man gjøre via REST-APIet for det aktuelle clusteret/miljøet. Se Swagger-dokumentasjonen til APIet for oversikt over tilgjengelige endepunkter. Den letteste måten å opprette topics på er å bruke oneshot-endepunktet ‘/api/v1/oneshot’ med en JSON-payload med ønsket konfigurasjon. Et eksempel på payload kan du se under. I members-arrayet legger man til systembrukere som skal ha tilgang til å produsere og konsumere meldinger innen topics som samsvarer med verdien til feltet topicName.

// Eksempel-payload for oneshot-oppretting av topic og acl
// role-feltet settes til enten "CONSUMER" eller "PRODUCER".

{
  "topics": [
    {
      "configEntries": {},
      "members": [
        {
          "member": <SYSTEMBRUKER>,
          "role": <ROLLE>
        }
      ],
      "numPartitions": <ANTALL_PARTISJONER>,
      "topicName": <TOPIC_NAVN>
    }
  ]
}

Nyttige lenker