Sunday, December 16, 2018

Reactive Spring Webflux with AWS DynamoDB

AWS has released AWS SDK for Java version 2, the SDK now supports non-blocking IO for the API calls of different AWS services. In this post I will be exploring using the DynamoDB API's of the AWS SDK 2.x and using Spring Webflux stack to expose a reactive endpoint - this way the application is reactive end to end and presumably should use resources very efficiently (I have plans to do some tests on this set-up as a follow up).


Details of the Application


It may be easier to simply look at the code and follow it there - it is available in my GitHub repo.

The application is a simple one - to perform CRUD operation on a Hotel entity represented using the following Kotlin code:

data class Hotel(
        val id: String = UUID.randomUUID().toString(),
        val name: String? = null,
        val address: String? = null,
        val state: String? = null,
        val zip: String? = null
)

I want to expose endpoints to save and retrieve a hotel entity and to get the list of hotels by state.


Details of the AWS SDK 2


The package names of the AWS SDK 2 api's all start with "software.amazon.awssdk" prefix now, the client to interact with DynamoDB is created using code along these lines:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

val client: DynamoDbAsyncClient = DynamoDbAsyncClient.builder()
        .region(Region.of(dynamoProperties.region))
        .credentialsProvider(DefaultCredentialsProvider.builder().build())
        .build()


Once the DynamoDbAsyncClient instance is created, any operation using this client returns a Java 8 CompletableFuture type. For eg. in saving a Hotel entity:

val putItemRequest = PutItemRequest.builder()
        .tableName("hotels")
        .item(HotelMapper.toMap(hotel))
        .build()
        
val result: CompletableFuture<PutItemResponse> =
        dynamoClient.putItem(putItemRequest)

and in retrieving a record by id:

val getItemRequest: GetItemRequest = GetItemRequest.builder()
        .key(mapOf(Constants.ID to AttributeValue.builder().s(id).build()))
        .tableName(Constants.TABLE_NAME)
        .build()

val response: CompletableFuture<GetItemResponse> = dynamoClient.getItem(getItemRequest)

CompletableFuture provides a comprehensive set of functions to transform the results when available.

Integrating with Spring Webflux

Spring Webflux is a reactive web framework. The non-blocking IO support in AWS SDK 2 now makes it possible to write an end to end reactive and non-blocking applications with DynamoDB. Spring Webflux uses reactor-core to provide reactive streams support and the trick to integrating with AWS SDK 2 is to transform the Java 8 CompletableFuture to a reactor-core type, the following way when retrieving an item from DynamoDB by id:

val getItemRequest: GetItemRequest = GetItemRequest.builder()
        .key(mapOf(Constants.ID to AttributeValue.builder().s(id).build()))
        .tableName(Constants.TABLE_NAME)
        .build()

return Mono.fromCompletionStage(dynamoClient.getItem(getItemRequest))
        .map { resp ->
            HotelMapper.fromMap(id, resp.item())
        }

Spring Webflux expects the return types of the different web endpoint method signatures to be of reactive types, so a typical endpoint for getting say a list of hotels is the following:

@RequestMapping(value = ["/hotels"], method = [RequestMethod.GET])
fun getHotelsByState(@RequestParam("state") state: String): Flux<Hotel> {
    return hotelRepo.findHotelsByState(state)
}

Spring Webflux also supports a functional way to describe the API of the application, so an equivalent API to retrieve a hotel by its id, but expressed as a functional DSL is the following:

@Configuration
class HotelAdditionalRoutes {

    @Bean
    fun routes(hotelRepo: HotelRepo) = router {
        GET("/hotels/{id}") { req ->
            val id = req.pathVariable("id")
            val response: Mono<ServerResponse> = hotelRepo.getHotel(id)
                    .flatMap { hotel ->
                        ServerResponse.ok().body(BodyInserters.fromObject(hotel))
                    }
            response.switchIfEmpty(ServerResponse.notFound().build())
        }
    }
}


Conclusion

AWS SDK 2 makes it simple to write an end to end reactive and non-blocking applications. I have used Spring Webflux and AWS SDK 2 dynamo client to write such an application here. The entire working sample is available in my GitHub repo - https://github.com/bijukunjummen/boot-with-dynamodb, and has instructions on how to start up a local version of DynamoDB and use it for testing the application.