All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:
1 | testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT" |
Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.
With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:
1 2 | @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded( 2 , true , 2 , "messages" ); |
This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.
Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send( new ProducerRecord<>( "messages" , 0 , 0 , "message0" )).get(); producer.send( new ProducerRecord<>( "messages" , 0 , 1 , "message1" )).get(); producer.send( new ProducerRecord<>( "messages" , 1 , 2 , "message2" )).get(); producer.send( new ProducerRecord<>( "messages" , 1 , 3 , "message3" )).get(); Map<String, Object> consumerProps = KafkaTestUtils.consumerProps( "sampleRawConsumer" , "false" , embeddedKafka); consumerProps.put( "auto.offset.reset" , "earliest" ); final CountDownLatch latch = new CountDownLatch( 4 ); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps); kafkaConsumer.subscribe(Collections.singletonList( "messages" )); try { while ( true ) { ConsumerRecords<Integer, String> records = kafkaConsumer.poll( 100 ); for (ConsumerRecord<Integer, String> record : records) { LOGGER.info( "consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}" , record.topic(), record.partition(), record.offset(), record.key(), record.value()); latch.countDown(); } } } finally { kafkaConsumer.close(); } }); assertThat(latch.await( 90 , TimeUnit.SECONDS)).isTrue(); |
A little more comprehensive test is available here