Tuesday, September 19, 2017

Testing time based reactor core streams with Virtual time

Reactor Core implements the Reactive Streams specification and deals with handling a (potentially unlimited) stream of data. If it interests you, do check out the excellent documentation it offers. Here I am assuming some basic familiarity with the Reactor Core libraries Flux and Mono types and will cover Reactor Core provides an abstraction to time itself to enable testing of functions which depend on passage of time.

For certain operators of Reactor-core, time is an important consideration - for eg, a variation of "interval" function which emits an increasing number every 5 seconds after an initial "delay" of 10 seconds:

val flux = Flux
        .interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
        .take(3)

Testing such a stream of data depending on normal passage of time would be terrible, such a test would take about 20 seconds to finish.

Reactor-Core provides a solution, an abstraction to time itself - Virtual time based Scheduler, that provides a neat way to test these kinds of operations in a deterministic way.

Let me show it in two ways, an explicit way which should make the actions of Virtual time based scheduler very clear followed by the recommended approach of testing with Reactor Core.

import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import reactor.core.publisher.Flux
import reactor.test.scheduler.VirtualTimeScheduler
import java.time.Duration
import java.util.concurrent.CountDownLatch


class VirtualTimeTest {
    
    @Test
    fun testExplicit() {
        val mutableList = mutableListOf<Long>()

        val scheduler = VirtualTimeScheduler.getOrSet()
        val flux = Flux
                .interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler)
                .take(3)

        val latch = CountDownLatch(1)
        
        flux.subscribe({ l -> mutableList.add(l) }, { _ -> }, { latch.countDown() })
        
        scheduler.advanceTimeBy(Duration.ofSeconds(10))
        assertThat(mutableList).containsExactly(0L)
        
        scheduler.advanceTimeBy(Duration.ofSeconds(5))
        assertThat(mutableList).containsExactly(0L, 1L)
        
        scheduler.advanceTimeBy(Duration.ofSeconds(5))
        assertThat(mutableList).containsExactly(0L, 1L, 2L)

        latch.await()
    }
    
}

1. First the scheduler for "Flux.interval" function is being set to be the Virtual Time based Scheduler.

2. The stream of data is expected to be emitted every 5 seconds after a 10 second delay

3. VirtualTimeScheduler provides an "advanceTimeBy" method to advance the Virtual time by a Duration, so the time is being first advanced by the delay time of 10 seconds at which point the first element(0) is expected to be emitted

4. Then it is subsequently advanced by 5 seconds twice to get 1 and 2 respectively.

This is deterministic and the test completes quickly. This version of the test is ugly though, it uses a list to collect and assert the results on and a CountDownLatch to control when the test terminates. A far cleaner approach for testing Reactor-Core types is using the excellent StepVerifier class and a test which makes use of this class looks like this:

import org.junit.Test
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import reactor.test.scheduler.VirtualTimeScheduler
import java.time.Duration

class VirtualTimeTest {

    @Test
    fun testWithStepVerifier() {

        VirtualTimeScheduler.getOrSet()
        val flux = Flux
                .interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
                .take(3)

        StepVerifier.withVirtualTime({ flux })
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(10))
                .expectNext(0)
                .thenAwait(Duration.ofSeconds(5))
                .expectNext(1)
                .thenAwait(Duration.ofSeconds(5))
                .expectNext(2)
                .verifyComplete()
    }
 }

This new test with StepVerifier reads well with each step advancing time and asserting on what is expected at that point.



Friday, September 1, 2017

Spring Webflux - Kotlin DSL - a walkthrough of the implementation

In a previous blog post I had described how Spring Webflux, the reactive programming support in Spring Web Framework, uses a Kotlin based DSL to enable users to describe routes in a very intuitive way. Here I wanted to explore a little of the underlying implementation.


A sample DSL describing a set of endpoints looks like this:

package sample.routes

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType.APPLICATION_JSON
import org.springframework.web.reactive.function.server.router
import sample.handler.MessageHandler

@Configuration
class AppRoutes(private val messageHandler: MessageHandler) {

    @Bean
    fun apis() = router {
        (accept(APPLICATION_JSON) and "/messages").nest {
            GET("/", messageHandler::getMessages)
            POST("/", messageHandler::addMessage)
            GET("/{id}", messageHandler::getMessage)
            PUT("/{id}", messageHandler::updateMessage)
            DELETE("/{id}", messageHandler::deleteMessage)
        }
    }

}


To analyze the sample let me start with a smaller working example:

import org.junit.Test
import org.springframework.test.web.reactive.server.WebTestClient
import org.springframework.web.reactive.function.server.ServerResponse.ok
import org.springframework.web.reactive.function.server.router

class AppRoutesTest {

    @Test
    fun testSimpleGet() {
        val routerFunction = router {
            GET("/isokay", { _ -> ok().build() })
        }

        val client = WebTestClient.bindToRouterFunction(routerFunction).build()

        client.get()
                .uri("/isokay")
                .exchange()
                .expectStatus().isOk
    }
}

The heart of the route definition is the "router" function:

import org.springframework.web.reactive.function.server.router
...
val routerFunction = router {
    GET("/isokay", { _ -> ok().build() })
}

which is defined the following way:

fun router(routes: RouterFunctionDsl.() -> Unit) = RouterFunctionDsl().apply(routes).router()

The parameter "routes" is a special type of lambda expression, called a Lambda expression with a receiver. This means that in the context of the router function, this lambda expression can only be invoked by instances of "RouterFunctionDsl" which is what is done in the body of the function using apply method, this also means in the body of the lambda expression "this" refers to an instance of "RouterFunctionDsl". Knowing this opens up access to the methods of "RouterFunctionDsl" one of which is GET that is used in the example, GET is defined as follows:

fun GET(pattern: String, f: (ServerRequest) -> Mono<ServerResponse>) {
  ...
}

There are other ways express the same endpoint:

GET("/isokay2")({ _ -> ok().build() })

implemented in Kotlin very cleverly as:

fun GET(pattern: String): RequestPredicate = RequestPredicates.GET(pattern)

operator fun RequestPredicate.invoke(f: (ServerRequest) -> Mono<ServerResponse>) {
 ...
}

Here GET with the pattern returns a "RequestPredicate" for which an extension function has been defined (in the context of the DSL) called invoke, which is in turn a specially named operator.

Or a third way:

"/isokay" { _ -> ok().build() }

which is implemented by adding an extension function on String type and defined the following way:

operator fun String.invoke(f: (ServerRequest) -> Mono<ServerResponse>) {
  ...
}


I feel that the Spring Webflux makes an excellent use of the Kotlin DSL in making some of these route definitions easy to read while remaining concise.

This should provide enough primer to explore the source code of Routing DSL in Spring Webflux .

My samples are available in a github repository here - https://github.com/bijukunjummen/webflux-route-with-kotlin