Tuesday, November 29, 2016

Using Kafka with Junit

One of the neat features that the excellent Spring Kafka project provides, apart from a easier to use abstraction over raw Kafka Producer and Consumer, is a way to use Kafka in tests. It does this by providing an embedded version of Kafka that can be set-up and torn down very easily.

All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.

With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.

Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();


Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

A little more comprehensive test is available here

3 comments:

  1. can you provide a link to a complete github project please?

    ReplyDelete
  2. Hi Biju :-), I am writing unit tests a consumer of type BatchAcknowledgingMessageListener. So I cannot use countdownlatch like in your example. For me, it would be enough to verify that onMessage block is executed atleast once. But I had to add sleep(10s) after sendTemplate.flush()...which makes test flaky.

    Any way to write test without putting thread to sleep?


    @Test
    public void testConsumer() throws Exception {

    //Set up a container
    Map consumerProps = consumerProps(embeddedKafka.getBrokersAsString(), "audit-consumer", "false");
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);

    ContainerProperties containerProperties = new ContainerProperties("messages");
    containerProperties.setMessageListener(kafkaMessageConsumerService);

    KafkaMessageListenerContainer container =
    new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    container.setBeanName("audit-consumer");
    container.start();

    // wait until the container has the required number of assigned partitions
    ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());

    //send 5 messages
    Map senderProps = senderProps(embeddedKafka.getBrokersAsString());
    ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
    KafkaTemplate template = new KafkaTemplate<>(producerFactory);
    template.setDefaultTopic("messages");
    template.sendDefault(0, "key1", "json1");
    template.sendDefault(0, "key2", "json2");
    template.sendDefault(0, "key3", "json3");
    template.sendDefault(0, "key4", "json4");
    template.sendDefault(0, "key5", "json5");
    template.flush();

    TimeUnit.SECONDS.sleep(10);

    ArgumentCaptor listArgumentCaptor = ArgumentCaptor.forClass(List.class);
    verify(auditMessageProcessor, atLeast(1)).process(listArgumentCaptor.capture());

    container.stop();
    }

    ReplyDelete
    Replies
    1. By the way, I am now aware of TestUtils.waitForCondition(condition, timeout, ..) from Spring Kafka..But in this case, I'd rather use mockito with a timeout

      verify(auditMessageProcessor, timeout(10000).atLeast(1)).process(listArgumentCaptor.capture());

      Delete