This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.
What you'll learn
As an alternative, use git to clone the project:
git clone https://github.com/navikt/codelab-kafka-clients.git
(On NAV developer image you will need add proxy by running git config --global http.proxy http://webproxy-utvikler.nav.no:8088 first)
In project root, run gradle:
$ cd codelab-kafka-clients
$ ./gradlew --offline build
or in Windows:
$ cd codelab-kafka-clients
$ gradlew.bat --offline build
If everything is set up you should see
BUILD SUCCESSFUL in Xs
On NAV developer image: If you get an Connection refused error when executing gradle, add a file named gradle.properties in an folder named .gradle in your home directory with the following content:
systemProp.proxySet=true
systemProp.http.proxyHost=webproxy-utvikler.nav.no
systemProp.http.proxyPort=8088
systemProp.https.proxyHost=webproxy-utvikler.nav.no
systemProp.https.proxyPort=8088
Windows: C:\Users\<username>\.gradle\gradle.properties
Linux and Mac: ~/.gradle/gradle.properties
In the project directory, there is a docker-compose.yml file, navigate to the file and run docker-compose:
$ docker-compose up
This should start Zookeeper and a Kafka instance locally.
On NAV developer image you will need to configure proxy for docker. Open Docker and Settings:
http://webproxy-utvikler.nav.no:8088 on both HTTP and HTTPS field
File -> New -> Project from Existing sources... navigate to project folder and press OKGradle in next window - NextGlobal Gradle settings - choose Offline work and hit FinishFizzBuzzNumberEntered topic (See FizzBuzzCandidateProducer.java in the project).FizzBuzzNumberEntered, calculate whether or not that number is a "Fizz-Buzz" number candidate andFizzBuzzAnswered with the answer.The answer must be compliant to JSON file:
{
"answer": "FizzBuzz", // - "FizzBuzz", "Fizz" or "Buzz"
"candidate" : 15, // Number entered
"groupId" : "A-team" // Identicator of the team
}
• Fizz-Buzz test - Write a program that prints the numbers from 1 to 100. But for multiples of three print "Fizz" instead of the number and for the multiples of five print "Buzz". For numbers which are multiples of both three and five print "FizzBuzz"
Ref https://en.wikipedia.org/wiki/Fizz_buzz
Provided in the project there is a Java class named FizzBuzzCandidateProducer. This class produce random numbers on a topic named FizzBuzzNumberEntered.
FizzBuzzNumberEnteredConsumer.java)KafkaConsumer and instantiate it;public class FizzBuzzNumberEnteredConsumer {
private final static String TOPIC = "FizzBuzzNumberEntered";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "fizzbuzz-consumer");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singleton(TOPIC));
}
}
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
for (ConsumerRecord<String, Integer> record : records) {
System.out.println("Consumed " + record);
}
}
FizzBuzzCandidateProducer (Run the main method in IntelliJ). If all looks good, go ahead to the next section.Provided in the project there is an FizzBuzz "calculator" we can use to calculate the answer.
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100)); // Fetch records, give up after timeout
for (ConsumerRecord<String, Integer> record : records) {
System.out.println("Consumed " + record);
// first extract the number from the record
Integer number = record.value();
// Calculate FizzBuzz
String fizzBuzzCandidate = FizzBuzz.calculate(number);
FizzBuzzAnswerMessage answer = new FizzBuzzAnswerMessage(fizzBuzzCandidate, number, "A-Team");
// create json
final String answerAsJson = new Gson().newBuilder().create().toJson(answer);
// @todo: Produce answer to `FizzBuzzAnswered` topic
// tip: See FizzBuzzCandidateProducer.java
}
}
Tip, Kafka also has a command line consumer that will dump out messages to standard output, by running:
docker run --net host confluentinc/cp-kafka kafka-console-consumer --bootstrap-server localhost:9092 -topic FizzBuzzAnswered