This tutorial shows you how to create a Kafka-consumer and -producer using kafka-clients java library.

What you'll learn

Prerequisites

Docker on Windows VDI requires virtualization

Clone the https://github.com/navikt/kafka-codelab.git repository

$ git clone https://github.com/navikt/kafka-codelab.git

Go to the kafkacodelabschema project in the kafka-codelab repository and run a clean install from a terminal

$ mvn clean install

Open a terminal window and run the landoop/fast-data-dev docker image

$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \
       -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost \
       landoop/fast-data-dev:latest

Optional: Create a ‘dice-rolls' topic

$ docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 \
       --create --topic dice-rolls --replication-factor 1 --partitions 1

Optional: Visit http://localhost:3030/kafka-topics-ui/#/ to verify that the ‘dice-rolls' topic was created

Create a kafka producer that produces messages to the ‘dice-rolls' topic

public class DiceRollProducer {
    private static final Logger LOGGER = LogManager.getLogger(DiceRollProducer.class);
    public static void main(String[] args) {
        DiceRollProducer producer = new DiceRollProducer();

        int numOfRolls = 100;
        if (args.length > 0) {
            numOfRolls = Integer.parseInt(args[0]);
        }
        producer.startRolling(numOfRolls);
    }

    private void startRolling(int numberOfRolls) {
        // @todo: Create a kafka producer
        // @todo: Use the rollDices() method and produce a kafka message for every dice roll to the 'dice-rolls' topic
    }

    private AbstractMap.SimpleEntry<DiceCount, DiceRoll> rollDices() {
        Random r = new Random();
        int count = r.nextInt(5) + 1; // Roll anywhere between 1 and 5 dice
        List<Integer> dice = getRollResult(r, count);
        DiceRoll diceRoll = DiceRoll.newBuilder().setCount(count).setDice(dice).build();
        DiceCount diceCount = DiceCount.newBuilder().setCount(count).build();
        LOGGER.info("Rolled {}", diceRoll);
        return new AbstractMap.SimpleEntry<>(diceCount, diceRoll);
    }

    private List<Integer> getRollResult(Random r, int count) {
        List<Integer> dice = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            dice.add(r.nextInt(6) + 1);
        }
        return dice;
    }

    private Properties getConfig() {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        p.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        p.put(ProducerConfig.CLIENT_ID_CONFIG, "diceroller-mine");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return p;
    }
}

Create a kafka consumer that reads the ‘dice-rolls' topic and prints each received message (key and value) to console.

public class DiceRollConsumer {
    private static LongAdder rollsSeen = new LongAdder();
    private static final Logger LOGGER = LogManager.getLogger(DiceRollConsumer.class);
    public static void main(String[] args) {
        DiceRollConsumer consumer = new DiceRollConsumer();
        consumer.start(consumer.logAndCount());

    }

    private Consumer<ConsumerRecord<DiceCount, DiceRoll>> logAndCount() {
        return r ->  {
            LOGGER.info("key: {} , value: {}", r.key(), r.value());
            rollsSeen.increment();
        };
    }

    private void start(Consumer<ConsumerRecord<DiceCount, DiceRoll>> onMessage) {
        // @todo: Create a kafka consumer
        // @todo: Consume the 'roll-dices' topic and print each message received to console
    }

    private Properties getConfig() {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "my-rolls3");
        p.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        p.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return p;
    }
}

Create a kafka streams application

public class DiceRollStreamer {
    private static final Logger LOGGER = LogManager.getLogger(DiceRollStreamer.class);

    public static void main(String[] args) {
        // @todo: Create a kafka stream consuming the 'roll-dices' topic
        // @todo: Create five new topics containing dice rolls for the different number of dices rolled (1->5)
        // @todo: Create a new topic containing all the true yatzy rolls (5 dices, with all 5 dices with same number)
    }

    private static Properties getConfig() {
        Properties p = new Properties();
        p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        p.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        p.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application-id");
        p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return p;
    }
}