My objective here is to show how
Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.
Sample scenario
The sample scenario is a simple one, I have a system which produces a message and another which processes it
Implementation using Raw Kafka Producer/Consumer API's
To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my
github repo here.
Producer
The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:
KafkaProducer<String, WorkUnit> producer
= new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());
I have used a variation of the KafkaProducer constructor which takes in a custom
Serializer to convert the domain object to a json representation.
Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.
ProducerRecord<String, WorkUnit> record
= new ProducerRecord<>("workunits", workUnit.getId(), workUnit);
RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();
Consumer
On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a
Deserializer which knows how to read a json message and translate that to the domain instance:
KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());
Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:
consumer.subscribe("workunits);
try {
while (true) {
ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
for (ConsumerRecord<String, WorkUnit> record : records) {
log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
this.consumer.close();
}
Implementation using Spring Kafka
I have the implementation using
Spring-kafka available in
my github repo.
Producer
Spring-Kafka provides a
KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:
@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}
@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
KafkaTemplate<String, WorkUnit> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic("workunits");
return kafkaTemplate;
}
One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.
And using KafkaTemplate to send a message:
SendResult<String, WorkUnit> sendResult =
workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);
Consumer
The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}
and the service which responds to messages read by the container:
@Service
public class WorkUnitsConsumer {
private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);
@KafkaListener(topics = "workunits")
public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
topic, partition, offset, workUnit);
}
}
Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.
Conclusion
I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it.
If you are interested in exploring further, the raw producer consumer sample
is available here and the Spring Kafka one
here