This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.

What you'll learn

Prerequisites

Docker on Windows VDI requires virtualization

Java and project setup

As an alternative, use git to clone the project:

git clone https://github.com/navikt/codelab-kafka-clients.git

(On NAV developer image you will need add proxy by running git config --global http.proxy http://webproxy-utvikler.nav.no:8088 first)

Gradle

In project root, run gradle:

$ cd codelab-kafka-clients
$ ./gradlew --offline build

or in Windows:

$ cd codelab-kafka-clients
$ gradlew.bat --offline build

If everything is set up you should see

BUILD SUCCESSFUL in Xs

Gradle "connection refused"

On NAV developer image: If you get an Connection refused error when executing gradle, add a file named gradle.properties in an folder named .gradle in your home directory with the following content:

systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088

Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties

Docker

In the project directory, there is a docker-compose.yml file, navigate to the file and run docker-compose:

$ docker-compose up

This should start Zookeeper and a Kafka instance locally.

Docker "connection refused"

On NAV developer image you will need to configure proxy for docker. Open Docker and Settings:

Proxy settings for NAV Windows developer image

IntelliJ

  1. Create a consumer that consume the FizzBuzzNumberEntered topic (See FizzBuzzCandidateProducer.java in the project).
  2. Based on the number in FizzBuzzNumberEntered, calculate whether or not that number is a "Fizz-Buzz" number candidate and
  3. produce a message to Kafka with topic FizzBuzzAnswered with the answer.

The answer must be compliant to JSON file:

{
  "answer": "FizzBuzz", // - "FizzBuzz", "Fizz" or "Buzz"
  "candidate" : 15, // Number entered
  "groupId" : "A-team" // Identicator of the team
}

• Fizz-Buzz test - Write a program that prints the numbers from 1 to 100. But for multiples of three print "Fizz" instead of the number and for the multiples of five print "Buzz". For numbers which are multiples of both three and five print "FizzBuzz"
Ref https://en.wikipedia.org/wiki/Fizz_buzz

Provided in the project there is a Java class named FizzBuzzCandidateProducer. This class produce random numbers on a topic named FizzBuzzNumberEntered.

public class FizzBuzzNumberEnteredConsumer {

    private final static String TOPIC = "FizzBuzzNumberEntered";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "fizzbuzz-consumer");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);


        KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(consumerProperties);
        consumer.subscribe(Collections.singleton(TOPIC));

    }
}

      while (true) {
          ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
          for (ConsumerRecord<String, Integer> record : records) {
              System.out.println("Consumed " + record);
          }
      }

Provided in the project there is an FizzBuzz "calculator" we can use to calculate the answer.

      while (true) {
          ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
          for (ConsumerRecord<String, Integer> record : records) {
              System.out.println("Consumed " + record);
              // first extract the number from the record
              Integer number = record.value();
              // Calculate FizzBuzz
              String fizzBuzzCandidate = FizzBuzz.calculate(number);
              FizzBuzzAnswerMessage answer = new FizzBuzzAnswerMessage(fizzBuzzCandidate, number, "A-Team");
              
              // create json 
              final String answerAsJson = new Gson().newBuilder().create().toJson(answer);
              
              // @todo: Produce answer to `FizzBuzzAnswered` topic
              
              // tip: See FizzBuzzCandidateProducer.java 
              
          }
      }

Tip, Kafka also has a command line consumer that will dump out messages to standard output, by running:

docker run --net host confluentinc/cp-kafka kafka-console-consumer --bootstrap-server localhost:9092 -topic FizzBuzzAnswered