In this post, I will be exploring how Spring Cloud provides a newer functional approach to wrapping a remote call with Hystrix.
Consider a simple service that returns a list of entities, say a list of cities, modeled using the excellent Wiremock tool:
WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching("/cities")) .withHeader("Accept", WireMock.equalTo("application/json")) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.OK.value()) .withFixedDelay(5000) .withHeader("Content-Type", "application/json")))
When called with a uri of the type "/cities" this Wiremock endpoint responds with a json of the following type:
[ { "country": "USA", "id": 1, "name": "Portland", "pop": 1600000 }, { "country": "USA", "id": 2, "name": "Seattle", "pop": 3200000 }, { "country": "USA", "id": 3, "name": "SFO", "pop": 6400000 } ]
after a delay of 5 seconds.
Traditional approach
There are many approaches to using Hystrix, I have traditionally preferred an approach where an explicit Hystrix Command protects the remote call, along these lines:
import com.netflix.hystrix.HystrixCommandGroupKey import com.netflix.hystrix.HystrixCommandKey import com.netflix.hystrix.HystrixCommandProperties import com.netflix.hystrix.HystrixObservableCommand import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.Observable import rx.RxReactiveStreams import rx.schedulers.Schedulers import java.net.URI class CitiesHystrixCommand( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) : HystrixObservableCommand<City>( HystrixObservableCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("cities-service")) .andCommandKey(HystrixCommandKey.Factory.asKey("cities-service")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(4000))) { override fun construct(): Observable<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path("/cities") .build() .encode() .toUri() val webClient: WebClient = this.webClientBuilder.build() val result: Flux<City> = webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } return RxReactiveStreams.toObservable(result) } override fun resumeWithFallback(): Observable<City> { LOGGER.error("Falling back on cities call", executionException) return Observable.empty() } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java) } }
This code can now be used to make a remote call the following way:
import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient class CitiesHystrixCommandBasedClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { val citiesObservable: Observable<City> = CitiesHystrixCommand(webClientBuilder, citiesBaseUrl) .observe() .subscribeOn(Schedulers.io()) return Flux .from(RxReactiveStreams .toPublisher(citiesObservable)) } }
Two things to note here,
1. WebClient returns a Project Reactor "Flux" type representing a list of cities, however Hystrix is Rx-Java 1 based, so Flux is being transformed to Rx-Java Observable using "RxReactiveStreams.toObservable()" call, provided by the RxJavaReactiveStreams library here.
2. I still want Project Reactor "Flux" type to be used in the rest of the application, so there is another adapter that converts the Rx-Java Observable back to a Flux - "Flux.from(RxReactiveStreams.toPublisher(citiesObservable))" once the call wrapped in Hystrix returns.
If I were to try this client with the wiremock sample with the 5 second delay, it correctly handles the delay and returns after a second.
Functional approach
There is a lot of boiler-plate with the previous approach which is avoided with the new functional approach of using HystrixCommands, a utility class which comes with Spring Cloud which provides a functional approach to making the remote call wrapped with Hystrix.
The entirety of the call using HystrixCommands looks like this:
import com.netflix.hystrix.HystrixCommandProperties import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.cloud.netflix.hystrix.HystrixCommands import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.schedulers.Schedulers import java.net.URI class CitiesFunctionalHystrixClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { return HystrixCommands .from(callCitiesService()) .commandName("cities-service") .groupName("cities-service") .commandProperties( HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(1000) ) .toObservable { obs -> obs.observe() .subscribeOn(Schedulers.io()) } .fallback { t: Throwable -> LOGGER.error(t.message, t) Flux.empty() } .toFlux() } fun callCitiesService(): Flux<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path("/cities") .build() .encode() .toUri() val webClient: WebClient = this.webClientBuilder.build() return webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java) } }
A lot of boiler-plate is avoided with this approach -
1. an explicit command is not required anymore
2. the call and the fallback are coded in a fluent manner
3. Any overrides can be explicitly specified - in this specific instance the timeout of 1 second.