My objective here is to show how 
Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.
Sample scenario
The sample scenario is a simple one, I have a system which produces a message and another which processes it
Implementation using Raw Kafka Producer/Consumer API's
To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my 
github repo here.
Producer
The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:
KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());
I have used a variation of the KafkaProducer constructor which takes in a custom 
Serializer to convert the domain object to a json representation.
Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.
ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);
RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();
Consumer
On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a 
Deserializer which knows how to read a json message and translate that to the domain instance:
KafkaConsumer<String, WorkUnit> consumer  
    = new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());
Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:
consumer.subscribe("workunits);
try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
    }
} finally {
    this.consumer.close();
}
Implementation using Spring Kafka 
I have the implementation using 
Spring-kafka available in 
my github repo.
Producer
Spring-Kafka provides a 
KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:
@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}
@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic("workunits");
    return kafkaTemplate;
}
One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.
And using KafkaTemplate to send a message:
SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);
Consumer
The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}
and the service which responds to messages read by the container:
@Service
public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);
    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);
    }
}
Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.
Conclusion
I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it. 
If you are interested in exploring further, the raw producer consumer sample 
is available here and the Spring Kafka one 
here