This tutorial shows you how to create a Kafka-consumer and -producer using kafka-streams java library.

What you'll learn

Prerequisites

Docker on Windows VDI requires virtualization

Java and project setup

Gradle

In project root, run gradle:

$ cd kafka-streams-avro-codelab
$ ./gradlew --offline build

or in Windows:

$ cd kafka-streams-avro-codelab    
$ gradlew.bat --offline build

If everything is set up you should see

BUILD SUCCESSFUL in Xs

Gradle "connection refused"

On NAV developer image: If you get an Connection refused error when executing gradle, add a file named gradle.properties in an folder named .gradle in your home directory with the following content:

systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088

Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties

Docker

In the project directory, there is a docker-compose.yml file, navigate to the file and run docker-compose from the command line:

$ docker-compose up

This should start Zookeeper and a Kafka instances locally.

IntelliJ

NavVisitorLocation is a topic produced to Kafka each time a visitor visits the nav.no front page. Each message is structured in an AVRO object with postal- and municipality information, example:

{"postnummer":"2005","stedsnavn":"RÆLINGEN","kommune":"0228"}
  1. Create a Kafka stream consumes NavVisitorLocation topic (See NavVisitorProducer.java in the project).
  2. And count visits per area (stedsnavn)
  3. Produce the result to an topic named NavVisitorAreaCounter

Create topics

  1. Create NavVisitorLocation topic: docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorLocation --replication-factor 1 --partitions 1 will create the topic
  2. To verify that the topic as been created, we can run: docker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorLocation
  3. Create NavVisitorAreaCounter topic: docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorAreaCounter --replication-factor 1 --partitions 1 will create the topic
  4. To verify that the topic as been created, we can run: docker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorAreaCounter
  1. In IntelliJ - Create a new Java file, give it a name (e.g. NavVisitorAreaCounterStream.java)
  2. Next we need to create kafka stream configuration by copy & paste the code underneath
public class NavVisitorAreaCounterStream {

    private static final String SCHEMA_REGISTER_URL = "http://localhost:8081/";
    private static final String BOOTSTRAP_SERVERS = "localhost:29092";

    public static void main(String[] args) {
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
        // against which the application is run (consumer group id).
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "nav-visitor-area-consumer");
 
       // Where to find kafka brokers
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // Where to find schema-registry 
        streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTER_URL);
       
        // Default (de)serializers for record keys and for record values.
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
         
    }
}    
  1. Define a StreamsBuilder final StreamsBuilder builder = new StreamsBuilder();
  2. Next, create a KStream instance that consumes NavVisitorAreaCounter.
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Poststed> poststedKStream = builder.stream("NavVisitorLocation");

We should now be able to consume records from Kafka.

Try it out by first start the consumer we just created (Run the main method in IntelliJ) and then start the producer, NavVisitorProducer (Run the main method in IntelliJ).
If all looks good, go ahead to the next section.

Back to the task, for each record, we like to increase a counter for that area.

Kafka Streams DSL provides a high-level API for common data transformation operations such as map, filter, join, and aggregations out of the box.

For this tutorial we are going to use an KTable. A KTable is an abstraction of a changelog stream< from a primary-keyed table.

   final KTable<String, Long> areaCounts = poststedKStream
            .groupBy(new KeyValueMapper<String, Poststed, String>() {
                @Override
                public String apply(String key, Poststed poststed) {
                    return poststed.getStedsnavn(); // Group by stedsnavn
                }
            })
            .count(); // Update the table with a count (stedsnavn is primary keyed)

In order to have a peek inside the ktable result we cant add a peek method to areaCounts

 areaCounts.toStream().peek(
            (navn, teller) -> System.out.println("stedsnavn = [" + navn + "], teller = [" + teller + "]")
        );

Try it out by starting the consumer (Run the main method in IntelliJ) and then start the producer. We should now be able to see area counts in the log message.

        areaCounts.toStream().peek(
            (navn, teller) -> System.out.println("stedsnavn = [" + navn + "], teller = [" + teller + "]")
        ).mapValues(new ValueMapperWithKey<String, Long, Resultat>() {
            @Override
            public Resultat apply(String poststed, Long teller) {
                return new Resultat(poststed, teller); // map to result object
            }
        }).to("NavVisitorAreaCounter"); // produce the result back to Kafka as an stream
        
        // and finally configure and start the stream
       final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

        streams.cleanUp();
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
docker run --net=host -it confluentinc/cp-schema-registry:latest kafka-avro-console-consumer --bootstrap-server localhost:29092 --property schema.registry.url=http://localhost:8081 --topic NavVisitorAreaCounter

This concludes this tutorial - but feel free to play around with the Kafka Streams API.