All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"
Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.
With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");
This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.
Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get(); producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get(); producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get(); producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get(); Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka); consumerProps.put("auto.offset.reset", "earliest"); final CountDownLatch latch = new CountDownLatch(4); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps); kafkaConsumer.subscribe(Collections.singletonList("messages")); try { while (true) { ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); latch.countDown(); } } } finally { kafkaConsumer.close(); } }); assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();
A little more comprehensive test is available here
can you provide a link to a complete github project please?
ReplyDeleteHi Biju :-), I am writing unit tests a consumer of type BatchAcknowledgingMessageListener. So I cannot use countdownlatch like in your example. For me, it would be enough to verify that onMessage block is executed atleast once. But I had to add sleep(10s) after sendTemplate.flush()...which makes test flaky.
ReplyDeleteAny way to write test without putting thread to sleep?
@Test
public void testConsumer() throws Exception {
//Set up a container
Map consumerProps = consumerProps(embeddedKafka.getBrokersAsString(), "audit-consumer", "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties("messages");
containerProperties.setMessageListener(kafkaMessageConsumerService);
KafkaMessageListenerContainer container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.setBeanName("audit-consumer");
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
//send 5 messages
Map senderProps = senderProps(embeddedKafka.getBrokersAsString());
ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic("messages");
template.sendDefault(0, "key1", "json1");
template.sendDefault(0, "key2", "json2");
template.sendDefault(0, "key3", "json3");
template.sendDefault(0, "key4", "json4");
template.sendDefault(0, "key5", "json5");
template.flush();
TimeUnit.SECONDS.sleep(10);
ArgumentCaptor listArgumentCaptor = ArgumentCaptor.forClass(List.class);
verify(auditMessageProcessor, atLeast(1)).process(listArgumentCaptor.capture());
container.stop();
}
By the way, I am now aware of TestUtils.waitForCondition(condition, timeout, ..) from Spring Kafka..But in this case, I'd rather use mockito with a timeout
Deleteverify(auditMessageProcessor, timeout(10000).atLeast(1)).process(listArgumentCaptor.capture());