This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.

What you'll learn

Prerequisites

Docker on Windows VDI requires virtualization

Java and project setup

As an alternative, use git to clone the project:

git clone https://github.com/navikt/kafka-clients-guarantees-codelab.git

(On NAV developer image you will need add proxy by running git config --global http.proxy http://webproxy-utvikler.nav.no:8088 first)

Gradle

In project root, run gradle:

$ cd codelab-kafka-clients  
$ ./gradlew --offline build

or on Windows:

$ cd codelab-kafka-clients
$ gradlew.bat --offline build

If everything is set up you should see

BUILD SUCCESSFUL in Xs

Gradle "connection refused"

On NAV developer image: If you get an Connection refused error when executing gradle, add a file named gradle.properties in an folder named .gradle in your home directory with the following content:

systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088

Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties

Docker

In the project directory, there is a docker-compose.yml file, navigate to the file and run docker-compose from the command line:

$ docker-compose up

This should start Zookeeper and a Kafka instances locally.

IntelliJ

NavVisitorLocation is a topic produced to Kafka each time a visitor visits the nav.no webpage. Each message is structured as JSON with postal- and municipality information, example:

{"postnummer":"2005","stedsnavn":"RÆLINGEN","kommune":"0228"}
  1. Create a consumer that consumes NavVisitorLocation topic (See NavBesøkProducer.java in the project).
  2. Track visits per area (stedsnavn)
  3. Based on area visits, print information of the top three areas that have the highest count.
  1. Running the command from the command line: docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorLocation --replication-factor 2 --partitions 4 will create the topic
  2. To verify that the topic as been created, we can run: docker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorLocation
public class NavVisitorConsumer {

    private final static String TOPIC = "NavVisitorLocation";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static Logger LOG = LogManager.getLogger();
    
    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "visitor-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> navBesøkConsumer = new KafkaConsumer<>(props);
        navBesøkConsumer.subscribe(Collections.singleton(TOPIC));

     
        while (true) {
            ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("Consumed " + record);
            }
        }
    }
}

For each record, we increase a counter for that area. PoststedCounter.java have a method count which take a Poststed as an argument and increase a counter by 1 each time a unique Poststed is added.

  1. First we need to take the record value (JSON) and convert it to an Poststed object:Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
  2. Create an instance of the PoststedCounter: PoststedCounter counter = new PoststedCounter() (outside the while loop)
  3. Add the parsed Poststed object to the counter: counter.count(poststed);
  4. Print result

something like:

        ..... 
        
        PoststedCounter counter = new PoststedCounter()
        while (true) {
            ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("Consumed " + record);
                Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
                counter.count(poststed);
            }
            if(!records.isEmpty()){
                LOG.info("Top three -> {}", counter.getTopThreeCount());
            }
        }

PoststedCounter is not safe and will throw an exception in certain circumstances. Our consumer is currently configured to autocommit offsets after topic reads. We need to take control over when we do commit in order to not "loose" messages.

  1. Disable autocommit: props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
  2. Read earliest messages props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // disable auto commit of offsets
  3. Create an "record consumed counter": Integer consumedCounter = 0; outside the while loop
  4. Add navBesøkConsumer.commitSync() after the for(Consu... loop, and inside an if statement checking consumedCounter against records.count()

Something like:

        int consumedCounter = 0;
        while (true) {
            ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
                counter.count(poststed);
                consumedCounter++;
            }
            if(consumedCounter == records.count()){ 
                if (!records.isEmpty()) {
                    navBesøkConsumer.commitSync();
                    LOG.info("Top three -> {}", counter.getTopThreeCount());
                    consumedCounter = 0;
                }
            }

        }

Optional: Produce top three areas to a new topic

Create an producer that produces "top three areas" to an topic.