This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.
What you'll learn
As an alternative, use git to clone the project:
git clone https://github.com/navikt/codelab-kafka-clients.git
(On NAV developer image you will need add proxy by running git config --global http.proxy http://webproxy-utvikler.nav.no:8088
first)
In project root, run gradle:
$ cd codelab-kafka-clients
$ ./gradlew --offline build
or in Windows:
$ cd codelab-kafka-clients
$ gradlew.bat --offline build
If everything is set up you should see
BUILD SUCCESSFUL in Xs
On NAV developer image: If you get an Connection refused
error when executing gradle, add a file named gradle.properties
in an folder named .gradle
in your home directory with the following content:
systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088
Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties
In the project directory, there is a docker-compose.yml
file, navigate to the file and run docker-compose:
$ docker-compose up
This should start Zookeeper and a Kafka instance locally.
On NAV developer image you will need to configure proxy for docker. Open Docker and Settings:
http://webproxy-utvikler.nav.no:8088
on both HTTP and HTTPS fieldFile -> New -> Project from Existing sources...
navigate to project folder and press OKGradle
in next window - NextGlobal Gradle settings
- choose Offline work
and hit Finish
FizzBuzzNumberEntered
topic (See FizzBuzzCandidateProducer.java in the project).FizzBuzzNumberEntered
, calculate whether or not that number is a "Fizz-Buzz" number candidate andFizzBuzzAnswered
with the answer.The answer must be compliant to JSON file:
{
"answer": "FizzBuzz", // - "FizzBuzz", "Fizz" or "Buzz"
"candidate" : 15, // Number entered
"groupId" : "A-team" // Identicator of the team
}
• Fizz-Buzz test - Write a program that prints the numbers from 1 to 100. But for multiples of three print "Fizz" instead of the number and for the multiples of five print "Buzz". For numbers which are multiples of both three and five print "FizzBuzz"
Ref https://en.wikipedia.org/wiki/Fizz_buzz
Provided in the project there is a Java class named FizzBuzzCandidateProducer
. This class produce random numbers on a topic named FizzBuzzNumberEntered
.
FizzBuzzNumberEnteredConsumer.java
)KafkaConsumer
and instantiate it;public class FizzBuzzNumberEnteredConsumer {
private final static String TOPIC = "FizzBuzzNumberEntered";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "fizzbuzz-consumer");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singleton(TOPIC));
}
}
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
for (ConsumerRecord<String, Integer> record : records) {
System.out.println("Consumed " + record);
}
}
FizzBuzzCandidateProducer
(Run the main method in IntelliJ). If all looks good, go ahead to the next section.Provided in the project there is an FizzBuzz "calculator" we can use to calculate the answer.
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
for (ConsumerRecord<String, Integer> record : records) {
System.out.println("Consumed " + record);
// first extract the number from the record
Integer number = record.value();
// Calculate FizzBuzz
String fizzBuzzCandidate = FizzBuzz.calculate(number);
FizzBuzzAnswerMessage answer = new FizzBuzzAnswerMessage(fizzBuzzCandidate, number, "A-Team");
// create json
final String answerAsJson = new Gson().newBuilder().create().toJson(answer);
// @todo: Produce answer to `FizzBuzzAnswered` topic
// tip: See FizzBuzzCandidateProducer.java
}
}
Tip, Kafka also has a command line consumer that will dump out messages to standard output, by running:
docker run --net host confluentinc/cp-kafka kafka-console-consumer --bootstrap-server localhost:9092 -topic FizzBuzzAnswered