This post will be a quick walkthrough of the kind of code involved in making a synchronous remote call, then show how layering in Non-blocking IO though highly efficient in the use of resources(especially threads) introduces complications referred to as a callback hell and how a reactive streams based approach simplifies the programming model.
Target Service
Since I will be writing a client call, my target service representing the details of a City has two endpoints. One returning a list of city id's when called with a uri of type - "/cityids" and a sample result looks like this:
[ 1, 2, 3, 4, 5, 6, 7 ]
and an endpoint returning the details of a city given its id, for example when called using an id of 1 - "/cities/1":
{ "country": "USA", "id": 1, "name": "Portland", "pop": 1600000 }
The client's responsibility is to get the list of city id's and then for each city id get the detail of the city and put it together into a list of cities.
Synchronous call
I am using Spring Framework's RestTemplate to make the remote call. A Kotlin function to get the list of cityids looks like this:
private fun getCityIds(): List<String> { val cityIdsEntity: ResponseEntity<List<String>> = restTemplate .exchange("http://localhost:$localServerPort/cityids", HttpMethod.GET, null, object : ParameterizedTypeReference<List<String>>() {}) return cityIdsEntity.body!! }
and to get the details of a city:
private fun getCityForId(id: String): City { return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!! }
Given these two functions it is easy to compose them such that a list of cities is returned:
val cityIds: List<String> = getCityIds() val cities: List<City> = cityIds .stream() .map<City> { cityId -> getCityForId(cityId) } .collect(Collectors.toList()) cities.forEach { city -> LOGGER.info(city.toString()) }
The code is very easy to understand, however, there are 8 blocking calls involved -
1. to get the list of 7 city ids and then to get the details for each
2. To get the details of each of the 7 cities
Each of these calls would have been on a different thread.
Using Non-Blocking IO with callback
I will be using a library called AsyncHttpClient to make a non-blocking IO call.AyncHttpClient returns a ListenableFuture type when a remote call is made.
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute()
A callback can be attached to a Listenable future to act on the response when available.
responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) .... }
Given the list of cityids I want to get the details of the city, so from the response I need to make more remote calls and attach a callback for each of the calls to get the details of the city along these lines:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute() responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) cityIds.stream().map { cityId -> val cityListenableFuture = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cities/$cityId") .execute() cityListenableFuture.addListener(Runnable { val cityDescResp = cityListenableFuture.get() val cityDesc = cityDescResp.responseBody val city = objectMapper.readValue(cityDesc, City::class.java) LOGGER.info("Got city: $city") }, executor) }.collect(Collectors.toList()) }, executor)
This is a gnarly piece of code, there is set of callbacks within a callback which is very difficult to reason about and make sense of and hence referred to as the callback hell.
Using Non-Blocking IO with Java CompletableFuture
This code can be improved a little by returning a Java's CompletableFuture as the return type instead of the ListenableFuture. CompletableFuture provides operators that allow the return type to modified and returned.
As an example, consider the function to get the list of city ids:
private fun getCityIds(): CompletableFuture<List<Long>> { return asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {}) l } }
Here I am using the "thenApply" operator to transform "CompletableFuture<Response>" to "CompletableFuture<List<Long>>
And similarly to get the detail a city:
private fun getCityDetail(cityId: Long): CompletableFuture<City> { return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId") .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody LOGGER.info("Got {}", s) val city = objectMapper.readValue(s, City::class.java) city } }
This is an improvement from the Callback based approach, however, CompletableFuture lacks sufficient operators, say in this specific instance where all the city details need to be put together:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds() val citiesCompletableFuture: CompletableFuture<List<City>> = cityIdsFuture .thenCompose { l -> val citiesCompletable: List<CompletableFuture<City>> = l.stream() .map { cityId -> getCityDetail(cityId) }.collect(toList()) val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray()) .thenApply { _: Void? -> citiesCompletable .stream() .map { it.join() } .collect(toList()) } citiesCompletableFutureOfList }
I have used an operator called CompletableFuture.allOf which returns a "Void" type and has to be coerced to return the desired type of ""CompletableFuture<List<City>>.
Using Project Reactor
Project Reactor is an implementation of the Reactive Streams specification. It has two specialized types to return a stream of 0/1 item and a stream of 0/n items - the former is a Mono, the latter a Flux.Project Reactor provides a very rich set of operators that allow the stream of data to be transformed in a variety of ways. Consider first the function to return a list of City ids:
private fun getCityIds(): Flux<Long> { return webClient.get() .uri("/cityids") .exchange() .flatMapMany { response -> LOGGER.info("Received cities..") response.bodyToFlux<Long>() } }
I am using Spring's excellent WebClient library to make the remote call and get a Project reactor "Mono<ClientResponse>" type of response, which can be modified to a "Flux<Long>" type using the "flatMapMany" operator.
Along the same lines to get the detail of the city, given a city id:
private fun getCityDetail(cityId: Long?): Mono<City> { return webClient.get() .uri("/cities/{id}", cityId!!) .exchange() .flatMap { response -> val city: Mono<City> = response.bodyToMono() LOGGER.info("Received city..") city } }
Here a Project reactor "Mono<ClientResponse>" type is being transformed to "Mono<City>" type using the "flatMap" operator.
and the code to get the cityids and then the City's from it:
val cityIdsFlux: Flux<Long> = getCityIds() val citiesFlux: Flux<City> = cityIdsFlux .flatMap { this.getCityDetail(it) } return citiesFlux
This is very expressive - contrast the mess of a callback based approach and the simplicity of the reactive streams based approach.
Conclusion
In my mind, this is one of the biggest reasons to use a Reactive Streams based approach and in particular Project Reactor for scenarios that involve crossing asynchronous boundaries like in this instance to make remote calls. It cleans up the mess of callbacks and callback hells and provides a natural approach of modifying/transforming types using a rich set of operators.My repository with a working version of all the samples that I have used here is available at https://github.com/bijukunjummen/reactive-cities-demo/tree/master/src/test/kotlin/samples/geo/kotlin