This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.
What you'll learn
As an alternative, use git to clone the project:
git clone https://github.com/navikt/kafka-clients-guarantees-codelab.git
(On NAV developer image you will need add proxy by running git config --global http.proxy http://webproxy-utvikler.nav.no:8088
first)
In project root, run gradle:
$ cd codelab-kafka-clients
$ ./gradlew --offline build
or on Windows:
$ cd codelab-kafka-clients
$ gradlew.bat --offline build
If everything is set up you should see
BUILD SUCCESSFUL in Xs
On NAV developer image: If you get an Connection refused
error when executing gradle, add a file named gradle.properties
in an folder named .gradle
in your home directory with the following content:
systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088
Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties
In the project directory, there is a docker-compose.yml
file, navigate to the file and run docker-compose from the command line:
$ docker-compose up
This should start Zookeeper and a Kafka instances locally.
File -> New -> Project from Existing sources...
navigate to project folder and press OKGradle
in next window - NextGlobal Gradle settings
- choose Offline work
and hit Finish
NavVisitorLocation is a topic produced to Kafka each time a visitor visits the nav.no webpage. Each message is structured as JSON with postal- and municipality information, example:
{"postnummer":"2005","stedsnavn":"RÆLINGEN","kommune":"0228"}
NavVisitorLocation
topic (See NavBesøkProducer.java in the project).NavVisitorProducer.java
. This class should produce random postal location on a topic named NavVisitorLocation
but first we have to create the topic.docker run --net host confluentinc/cp-kafka kafka-topics --create --zookeeper localhost:2181 -topic NavVisitorLocation --replication-factor 2 --partitions 4
will create the topicdocker run --net host confluentinc/cp-kafka kafka-topics --describe --zookeeper localhost:2181 -topic NavVisitorLocation
NavVisitorConsumer.java
)KafkaConsumer
and instantiate it;public class NavVisitorConsumer {
private final static String TOPIC = "NavVisitorLocation";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static Logger LOG = LogManager.getLogger();
public static void main(String[] args) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "visitor-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> navBesøkConsumer = new KafkaConsumer<>(props);
navBesøkConsumer.subscribe(Collections.singleton(TOPIC));
while (true) {
ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
LOG.info("Consumed " + record);
}
}
}
}
NavVisitorProducer
(Run the main method in IntelliJ). If all looks good, go ahead to the next section.For each record, we increase a counter for that area. PoststedCounter.java
have a method count
which take a Poststed
as an argument and increase a counter by 1 each time a unique Poststed
is added.
Poststed
object:Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
PoststedCounter
: PoststedCounter counter = new PoststedCounter()
(outside the while loop)Poststed
object to the counter: counter.count(poststed);
something like:
.....
PoststedCounter counter = new PoststedCounter()
while (true) {
ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
LOG.info("Consumed " + record);
Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
counter.count(poststed);
}
if(!records.isEmpty()){
LOG.info("Top three -> {}", counter.getTopThreeCount());
}
}
PoststedCounter
is not safe and will throw an exception in certain circumstances. Our consumer is currently configured to autocommit offsets after topic reads. We need to take control over when we do commit in order to not "loose" messages.
autocommit
: props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // disable auto commit of offsets
Integer consumedCounter = 0;
outside the while loopnavBesøkConsumer.commitSync
() after the for(Consu...
loop, and inside an if statement checking consumedCounter
against records.count()
Something like:
int consumedCounter = 0;
while (true) {
ConsumerRecords<String, String> records = navBesøkConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Poststed poststed = new Gson().newBuilder().create().fromJson(record.value(), Poststed.class);
counter.count(poststed);
consumedCounter++;
}
if(consumedCounter == records.count()){
if (!records.isEmpty()) {
navBesøkConsumer.commitSync();
LOG.info("Top three -> {}", counter.getTopThreeCount());
consumedCounter = 0;
}
}
}
Create an producer that produces "top three areas" to an topic.