This tutorial shows you how to create a Kafka-consumer and -producer using kafka-streams java library.
What you'll learn
In project root, run gradle:
$ cd kafka-streams-avro-codelab
$ ./gradlew --offline build
or in Windows:
$ cd kafka-streams-avro-codelab
$ gradlew.bat --offline build
If everything is set up you should see
BUILD SUCCESSFUL in Xs
On NAV developer image: If you get an Connection refused
error when executing gradle, add a file named gradle.properties
in an folder named .gradle
in your home directory with the following content:
systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088
Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties
In the project directory, there is a docker-compose.yml
file, navigate to the file and run docker-compose from the command line:
$ docker-compose up
This should start Zookeeper and a Kafka instances locally.
File -> New -> Project from Existing sources...
navigate to project folder and press OKGradle
in next window - NextGlobal Gradle settings
- choose Offline work
and hit Finish
NavVisitorLocation is a topic produced to Kafka each time a visitor visits the nav.no front page. Each message is structured in an AVRO object with postal- and municipality information, example:
{"postnummer":"2005","stedsnavn":"RÆLINGEN","kommune":"0228"}
NavVisitorLocation
topic (See NavVisitorProducer.java in the project).NavVisitorAreaCounter
NavVisitorProducer.java
. This class should produce random postal location on a topic named NavVisitorLocation
. To get started we need to create Topics.docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorLocation --replication-factor 1 --partitions 1
will create the topicdocker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorLocation
docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorAreaCounter --replication-factor 1 --partitions 1
will create the topicdocker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorAreaCounter
NavVisitorAreaCounterStream.java
)public class NavVisitorAreaCounterStream {
private static final String SCHEMA_REGISTER_URL = "http://localhost:8081/";
private static final String BOOTSTRAP_SERVERS = "localhost:29092";
public static void main(String[] args) {
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run (consumer group id).
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "nav-visitor-area-consumer");
// Where to find kafka brokers
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// Where to find schema-registry
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTER_URL);
// Default (de)serializers for record keys and for record values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
}
}
NavVisitorAreaCounter
topic.final StreamsBuilder builder = new StreamsBuilder();
NavVisitorAreaCounter
.final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Poststed> poststedKStream = builder.stream("NavVisitorLocation");
We should now be able to consume records from Kafka.
Try it out by first start the consumer we just created (Run the main method in IntelliJ) and then start the producer, NavVisitorProducer
(Run the main method in IntelliJ).
If all looks good, go ahead to the next section.
Back to the task, for each record, we like to increase a counter for that area.
Kafka Streams DSL provides a high-level API for common data transformation operations such as map, filter, join, and aggregations out of the box.
For this tutorial we are going to use an KTable. A KTable is an abstraction of a changelog stream< from a primary-keyed table.
final KTable<String, Long> areaCounts = poststedKStream
.groupBy(new KeyValueMapper<String, Poststed, String>() {
@Override
public String apply(String key, Poststed poststed) {
return poststed.getStedsnavn(); // Group by stedsnavn
}
})
.count(); // Update the table with a count (stedsnavn is primary keyed)
In order to have a peek inside the ktable result we cant add a peek
method to areaCounts
areaCounts.toStream().peek(
(navn, teller) -> System.out.println("stedsnavn = [" + navn + "], teller = [" + teller + "]")
);
Try it out by starting the consumer (Run the main method in IntelliJ) and then start the producer. We should now be able to see area counts in the log message.
areaCounts.toStream().peek(
(navn, teller) -> System.out.println("stedsnavn = [" + navn + "], teller = [" + teller + "]")
).mapValues(new ValueMapperWithKey<String, Long, Resultat>() {
@Override
public Resultat apply(String poststed, Long teller) {
return new Resultat(poststed, teller); // map to result object
}
}).to("NavVisitorAreaCounter"); // produce the result back to Kafka as an stream
// and finally configure and start the stream
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
docker run --net=host -it confluentinc/cp-schema-registry:latest kafka-avro-console-consumer --bootstrap-server localhost:29092 --property schema.registry.url=http://localhost:8081 --topic NavVisitorAreaCounter
This concludes this tutorial - but feel free to play around with the Kafka Streams API.