Tuesday, March 23, 2021

Project reactor and Caching with Caffeine

So you have a function which takes a key and returns a project reactor Mono type. 

Mono<String> get(String key) {
    Random random = ThreadLocalRandom.current();
    return Mono.fromSupplier(() -> key + random.nextInt());
}
And you want to cache the retrieval of this Mono type by key, a good way to do that is to use the excellent Caffeine library. Caffeine natively does not support reactor types however, but it is fairly easy to use Caffeine with reactor the following way:
public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration,
                                                    @NotNull Function<String, Mono<T>> fn) {
    final Cache<String, T> cache = Caffeine.newBuilder()
            .expireAfterWrite(duration)
            .recordStats()
            .build();

    return key -> {
        T result = cache.getIfPresent(key);
        if (result != null) {
            return Mono.just(result);
        } else {
            return fn.apply(key).doOnNext(n -> cache.put(key, n));
        }
    };
}
It essentially wraps a function returning the Mono, and uses Caffeine to get the value from a cache defined via a closure. If value is present in the cache it is returned otherwise when the Mono emits a value, the value in the cache is set from that. So how can this be used..here is a test with this utility:
Function<String, Mono<String>> fn = (k) -> get(k);
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn);
StepVerifier.create(wrappedFn.apply("key1"))
        .assertNext(result1 -> {
            StepVerifier.create(wrappedFn.apply("key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();
            StepVerifier.create(wrappedFn.apply("key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();

            StepVerifier.create(wrappedFn.apply("key2"))
                    .assertNext(result2 -> {
                        assertThat(result2).isNotEqualTo(result1);
                    })
                    .verifyComplete();
        })
        .verifyComplete();
Here I am using Project Reactors StepVerifier utility to run a test on this wrapped function and ensure that cached value is indeed returned for repeating keys. The full sample is available in this gist

Coroutine based Spring boot webflux application

I have worked with Spring Framework for ages and it still manages to surprise me with how cutting edge it continues to be but at the same time enabling a developer to put together a fairly sane app.

The most recent surprise was with how it enables programming a web application with Kotlin coroutines. Coroutines is a fairly complicated concept to get my head around, but it is starting to click now and while trying out some samples I thought it may be a good idea to put an end to end web application in place. 

Thanks to the excellent Spring Boot starters it was not difficult at all. Along the way I also decided to experiment with r2dbc which is another involved technology to interact with a database using reactive streams. Combining reactive streams for interacting with the database but using coroutines in the rest of the layers was not difficult at all. In this post I will not be covering the nuances of what I had to do to get the sample to work, but will cover one thin slice of what it looks like. The sample is here in my github repo and should be fairly self explanatory. 


I have to acknowledge that Nicolas Frankel's blog post provided me a lot of pointers in getting the working code just right


A Slice of functionality

The slice of functionality that I will consider in this post is to return a list of entities and an entity from an embedded database that I have used for the application. 

Let's start from bottom up. So at the lowest level I have to query the database and return a list of entities, this is made dirt simple using the Spring Data based repositories. This is the entirety of the repository code that returns coroutine types.


import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import samples.geo.domain.City

interface CityRepo : CoroutineCrudRepository<City, Long>
Just by doing this the CRUD operations now become suspendable functions. So to return a list of an entity or a specific entity, the signature looks something like this:


fun getCities(): Flow<City> {
    return cityRepo.findAll()
}

suspend fun getCity(id: Long): City? {
    return cityRepo.findById(id)
}
Any list operations now return the Coroutine Flow type and getting an entity is a suspendable function.


Moving to the web layer(I have a service layer, but it is just a passthrough to the repo in this instance), I like to have an handler for handling the Webflux ServerRequest and ServerResponse types the following way:
suspend fun getCities(request: ServerRequest): ServerResponse {
    val cities = cityService.getCities()
        .toList()
    return ServerResponse.ok().bodyValueAndAwait(cities)
}

suspend fun getCity(request: ServerRequest): ServerResponse {
    val id = request.pathVariable("id").toLong()
    val city = cityService.getCity(id)

    return city
        ?.let { ServerResponse.ok().bodyValueAndAwait(it) }
        ?: ServerResponse.notFound().buildAndAwait()
}
which is then composed at the web layer the following way:
object AppRoutes {
    fun routes(cityHandler: CityHandler): RouterFunction<*> = coRouter {
        accept(MediaType.APPLICATION_JSON).nest {
            GET("/cities", cityHandler::getCities)
            GET("/cities/{id}", cityHandler::getCity)
            ...
        }
    }
}
The "coRouter" dsl above provides the functionality to convert the Kotlin coroutine types to the Spring WebFlux RouterFunction type This is essentially it. The code and tests for all this fairly sophisticated set of technology(r2dbc, coroutines, webflux, reactive streams etc) that this encompasses is fairly small as can be seen from the github repository

Conclusion

Getting a working end to end web application with Coroutines and Spring Webflux is just a "Spring" board to further exploration of Coroutines for me and I hope to gain deeper insights into this fascinating technology over time. Having been involved in the Spring community for so long, it is fascinating to note that it continues to remain one of the best frameworks to develop applications in, mainly because of the constant innovation and its focus on developer happiness.

Sunday, March 14, 2021

Deriving a Kotlin "Try" type

Functional programming languages like Scala often have a type called "Try" to hold the result of a computation if successful or to capture an exception on failure. 

This is an incredibly useful type, allowing a caller to pointedly control how to handle an exceptional scenario. In this post I will try and create such a type from scratch.

As an example, I will be using the scenario from Daniel Westheide's excellent introduction to the Try type in Scala

So my objective is to call a remote URL and return the content as a string. A few things can go wrong - 

  • The url can be badly formed
  • The url may be wrong and may not have anything to retrieve content from

Let's start with the first one, the URL being badly formed, an API call using the "Try" type would look something like this:

fun parseUrl(url: String): Try<URL> {
    return Try.of {
        URL(url)
    }
}
Here a URL is being parsed and the result is a valid url or an exception. So a Try type that can implement this much, would look something like this:
sealed class Try<out T> {

    class Success<T>(private val result: T) : Try<T>()

    class Failure<T>(private val throwable: Throwable) : Try<T>()


    companion object {
        fun <T> of(block: () -> T) = try {
            Success(block())
        } catch (e: Throwable) {
            Failure(e)
        }
    }
}

"Try" type has two sub types - A "Success" wrapping a successful result and a "Failure" capturing an exception from the call. 

With the two subtypes in place, let's extend the use of the Try type:

val urlResult: Try<URL> = parseUrl("htt://somewrongurl")
assertThat(urlResult.isFailure()).isTrue()
assertThat(urlResult.isSuccess()).isFalse()

If I were to call using a badly formatted url like above with a wrong scheme "htt" instead of "http", should result in a failure. So let's implement the "isFailure" and "isSuccess" behavior:

sealed class Try<out T> {

    abstract fun isSuccess(): Boolean
    fun isFailure(): Boolean = !isSuccess()

    class Success<T>(private val result: T) : Try<T>() {
        override fun isSuccess(): Boolean = true
    }

    class Failure<T>(private val throwable: Throwable) : Try<T>() {
        override fun isSuccess(): Boolean = false
    }

    ...
}
That works nicely, so now that a url is available, hopefully valid, lets get some content from the URL:


val uriResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = getFromARemoteUrl(uriResult.get())
assertThat(getResult.get()).isEqualTo("a result")
which means that our "Try" type should have a "get()" method to retrieve the result if successful and can be implemented like this:


sealed class Try<out T> {
    ...
    abstract fun get(): T

    class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun get(): T = result
    }

    class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun get(): T = throw throwable
    }
}
The Success path simply returns the result and the Failure path propagates the wrapped exception.

map Operation

Let's take it a small step forward. Given a url say you want to return the host of the url
val uriResult: Try<URL> = parseUrl("http://myhost")
assertThat(uriResult.get().host).isEqualTo("myhost")
While this works, the problem with the approach is that the "get()" call for an invalid url would result in an exception if the url is not valid to start with, so a better approach is to retrieve the host name only if the url is valid. Traditionally this is done using a "map" operator and a usage looks like this:
val urlResult: Try<URL> = parseUrl("http://myhost")
val hostResult: Try<String> = urlResult.map { url -> url.host }
assertThat(hostResult).isEqualTo(Try.success("myhost"))
So let's add in a "map" operator to the "Try" type:
sealed class Try<out T> {
    ...
    abstract fun <R> map(block: (T) -> R): Try<R>

    abstract fun get(): T

    data class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun <R> map(block: (T) -> R): Try<R> {
            return of {
                block(result)
            }
        }
    }

    data class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun <R> map(block: (T) -> R): Try<R> {
            return this as Failure<R>
        }
    }
}
and it behaves as expected.

flatMap Operation

Along the lines of "map" operation, now lets get back to the original scenario of validating the url and then attempting to get the content. Now the call to get content can also fail, so you would want that to be wrapped with a Try type also. 

val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = getFromARemoteUrl(urlResult.get())

The two call needs to be chained together, and "map" operation may appear to be the right operator to use:
 
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<Try<String>> = urlResult.map { url -> getFromARemoteUrl(url) }

If you look at the response type now, it does not really line up, it is a "Try<Try<String>>" and not a "Try<String>", this is exactly what a flatMap operation does. It takes a valid URL and returns just the inner wrapped result. 
A test using it would look like this:
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = urlResult.flatMap { url -> getFromARemoteUrl(url) }
assertThat(getResult).isEqualTo(Try.success("a result"))
So how can "flatMap" be implemented, with a fairly simple code that looks like this:
sealed class Try<out T> {
    ...
    abstract fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R>

    data class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R> {
            return try {
                tryBlock(result)
            } catch (e: Throwable) {
                failure(e)
            }
        }
    }

    data class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R> {
            return this as Failure<R>
        }
    }
}
One more small feature, given that Try type has two subtypes is to deconstruct the contents when required:
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = urlResult.flatMap { url -> getFromARemoteUrl(url) }
when (getResult) {
    is Try.Success -> {
        val (s) = getResult
        println("Got a clean result: $s")
    }
    is Try.Failure -> {
        val (e) = getResult
        println("An exception: $e")
    }
}

This assumes that the user knows the subtypes which may be an okay assumption to make for this type. 


Conclusion

A type like "Try" is incredibly useful in capturing a result cleanly or with exception and provides a neat alternative to using a normal try..catch block. Here I showed a way to write such a type from scratch, however this may be an overkill, a better way to get such a type is to simply use an excellent library like vavr which has the Try type already built in. I feel it is instructive to create such a type from scratch though.

Tuesday, March 9, 2021

Jackson Kotlin extension and reified types

Jackson Kotlin module library is a pleasure to use. It greatly simplifies gnarly code, specifically one's involving TypeReference

Consider a sample json which looks like this:
{
    "a" : ["b", "c"],
    "b" : ["a", "c"],
    "c" : ["a", "b"]
}
This content can be represented as a "Map<List<String>>" type in Java. 

So now, if I were to use straight Java to convert the string to the appropriate type, the code would look like this:
Map<String, List<String>> result = objectMapper.readValue(json, new TypeReference<>() {
});
What exactly is that "TypeReference" doing there... think of it as a way of making the type parameters of the generic type "Map" which are "String" and "List" available at runtime, without that the types are erased at runtime and Java would not know that it has to create a "Map<String, List<String>>".

Kotlin can hide this detail behind an extension function which if defined from scratch would look like this:
inline fun <reified T> ObjectMapper.readValue(src: String): T = readValue(src, object : TypeReference<T>() {})
and a code using such an extension function:
val result: Map<String, List<String>> = objectMapper.readValue(json)
See how all the TypeReference related code is well hidden in the extension function. 

This is the kind of capability that is provided by the Jackson Kotlin Module With the right packages imported a sample code with this module looks like this:
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
val objectMapper = jacksonObjectMapper()
val result: Map<String, List<String>> = objectMapper.readValue(json)
This is just one of the simplifications offered by the use of the module. It also supports features like Kotlin Data classes, Kotlin built-in types like Pair, Triple etc out of the box.

Highly recommended if you are using Jackson with Kotlin.