This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.
What you'll learn
Clone the https://github.com/navikt/kafka-codelab.git repository
$ git clone https://github.com/navikt/kafka-codelab.git
Go to the kafkacodelabschema project in the kafka-codelab repository and run a clean install from a terminal
$ mvn clean install
Open a terminal window and run the landoop/fast-data-dev docker image
$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \
-p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost \
landoop/fast-data-dev:latest
Optional: Create a ‘dice-rolls' topic
$ docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 \
--create --topic dice-rolls --replication-factor 1 --partitions 1
Optional: Visit http://localhost:3030/kafka-topics-ui/#/ to verify that the ‘dice-rolls' topic was created
Create a kafka producer that produces messages to the ‘dice-rolls' topic
public class DiceRollProducer {
private static final Logger LOGGER = LogManager.getLogger(DiceRollProducer.class);
public static void main(String[] args) {
DiceRollProducer producer = new DiceRollProducer();
int numOfRolls = 100;
if (args.length > 0) {
numOfRolls = Integer.parseInt(args[0]);
}
producer.startRolling(numOfRolls);
}
private void startRolling(int numberOfRolls) {
// @todo: Create a kafka producer
// @todo: Use the rollDices() method and produce a kafka message for every dice roll to the 'dice-rolls' topic
}
private AbstractMap.SimpleEntry<DiceCount, DiceRoll> rollDices() {
Random r = new Random();
int count = r.nextInt(5) + 1; // Roll anywhere between 1 and 5 dice
List<Integer> dice = getRollResult(r, count);
DiceRoll diceRoll = DiceRoll.newBuilder().setCount(count).setDice(dice).build();
DiceCount diceCount = DiceCount.newBuilder().setCount(count).build();
LOGGER.info("Rolled {}", diceRoll);
return new AbstractMap.SimpleEntry<>(diceCount, diceRoll);
}
private List<Integer> getRollResult(Random r, int count) {
List<Integer> dice = new ArrayList<>();
for (int i = 0; i < count; i++) {
dice.add(r.nextInt(6) + 1);
}
return dice;
}
private Properties getConfig() {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
p.put(ProducerConfig.CLIENT_ID_CONFIG, "diceroller-mine");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return p;
}
}
Create a kafka consumer that reads the ‘dice-rolls' topic and prints each received message (key and value) to console.
public class DiceRollConsumer {
private static LongAdder rollsSeen = new LongAdder();
private static final Logger LOGGER = LogManager.getLogger(DiceRollConsumer.class);
public static void main(String[] args) {
DiceRollConsumer consumer = new DiceRollConsumer();
consumer.start(consumer.logAndCount());
}
private Consumer<ConsumerRecord<DiceCount, DiceRoll>> logAndCount() {
return r -> {
LOGGER.info("key: {} , value: {}", r.key(), r.value());
rollsSeen.increment();
};
}
private void start(Consumer<ConsumerRecord<DiceCount, DiceRoll>> onMessage) {
// @todo: Create a kafka consumer
// @todo: Consume the 'roll-dices' topic and print each message received to console
}
private Properties getConfig() {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ConsumerConfig.GROUP_ID_CONFIG, "my-rolls3");
p.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
p.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return p;
}
}
Create a kafka streams application
public class DiceRollStreamer {
private static final Logger LOGGER = LogManager.getLogger(DiceRollStreamer.class);
public static void main(String[] args) {
// @todo: Create a kafka stream consuming the 'roll-dices' topic
// @todo: Create five new topics containing dice rolls for the different number of dices rolled (1->5)
// @todo: Create a new topic containing all the true yatzy rolls (5 dices, with all 5 dices with same number)
}
private static Properties getConfig() {
Properties p = new Properties();
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application-id");
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return p;
}
}