Monday, November 30, 2020

AWS SDK 2 for Java and storing a Json in DynamoDB

 AWS DynamoDB is described as a NoSQL key-value and a document database. In my work I mostly use the key-value behavior of the database but rarely use the document database features, however  the document database part is growing on me and this post highlights some ways of using the document database feature of DynamoDB along with introducing a small utility library built on top of AWS SDK 2.X for Java that simplifies using document database features of AWS DynamoDB

The treatment of the document database features will be very high level in this post, I will plan a follow up which goes into more details later


DynamoDB as a document database

So what does it mean for AWS DynamoDB to be treated as a document database. Consider a json representation of an entity, say something representing a Hotel:

{
    "id": "1",
    "name": "test",
    "address": "test address",
    "state": "OR",
    "properties": {
        "amenities":{
            "rooms": 100,
            "gym": 2,
            "swimmingPool": true                    
        }                    
    },
    "zip": "zip"
}
This json has some top level attributes like "id", a name, an address etc. But it also has a free form "properties" holding some additional "nested" attributes of this hotel. 

A document database can store this document representing the hotel in its entirety OR can treat individual fields say the "properties" field of the hotel as a document. 
A naive way to do this will be to simply serialize the entire content into a json string and store it in, say for eg, for the properties field transform into a string representation of the json and store in the database, this works, but there are a few issues with it. 
  1. None of the attributes of the field like properties can be queried for, say if I wanted to know whether the hotel has a swimming pool, there is no way just to get this information of of the stored content. 
  2. The attributes cannot be filtered on - so say if wanted hotels with atleast 2 gyms, this is not something that can be filtered down to. 

A document database would allow for the the entire document to be saved, individual attributes, both top level and nested ones, to be queried/filtered on. 
So for eg, in the example of "hotel" document the top level attributes are "id", "name", "address", "state", "zip" and the nested attributes are "properties.amenities.rooms", "properties.amenities.gym", "properties.amenities.swimmingPool" and so on.

AWS SDK 2 for DynamoDB and Document database support

If you are writing a Java based application to interact with a AWS DynamoDB database, then you would have likely used the new AWS SDK 2 library to make the API calls. However one issue with the library is that it natively does not support a json based document model. Let me go into a little more detail here.  

From the AWS SDK 2 for AWS DynamoDB's perspective every attribute that is saved is an instance of something called an AttributeValue
A row of data, say for a hotel, is a simple map of "attribute" names to Attribute values, and a sample code looks something like this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to objectMapper.writeValueAsString(hotel.properties),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
dynamoClient.putItem(putItemRequest)
Here a map of each attribute to an AttributeValue is being created with an appropriate "type" of content, "s" indicates a string, "n" a number in the above sample. 

There are other AttributeValue types like "m" representing a map and "l" representing a list. 

The neat thing is that "m" and "l" types can have nested AttributeValues, which maps to a structured json document, however there is no simple way to convert a json to this kind of an Attribute Value and back. 

So for eg. if I were to handle the raw "properties" of a hotel which understands the nested attributes, an approach could be this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to AttributeValue.builder()
                .m(
                    mapOf(
                        "amenities" to AttributeValue.builder()
                            .m(
                                mapOf(
                                    "rooms" to AttributeValue.builder().n("200").build(),
                                    "gym" to AttributeValue.builder().n("2").build(),
                                    "swimmingPool" to AttributeValue.builder().bool(true).build()
                                )
                            )
                            .build()
                    )
                )
                .build(),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
See how the nested attributes are being expanded out recursively. 
 

Introducing the Json to AttributeValue utility library

This is exactly where the utility library that I have developed comes in. 

Given a json structure as a Jackson JsonNode it converts the Json into an appropriately nested AttributeValue type and when retrieving back from DynamoDB, can convert the resulting nested AttributeValue type back to a json. 

The structure would look exactly similar to the handcrafted sample shown before. So using the utility saving the "properties" would look like this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to JsonAttributeValueUtil.toAttributeValue(hotel.properties),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
dynamoClient.putItem(putItemRequest)
and when querying back from DynamoDB, the resulting nested AttributeValue converted back to a json this way(Kotlin code in case you are baffled by the "?let"):
properties = map[PROPERTIES]?.let { attributeValue ->
    JsonAttributeValueUtil.fromAttributeValue(
        attributeValue
    )
} ?: JsonNodeFactory.instance.objectNode()
The neat thing is even the top level attributes can be generated given a json representing the entire Hotel type. So say a json representing a Hotel is provided:
val hotel = """
    {
        "id": "1",
        "name": "test",
        "address": "test address",
        "state": "OR",
        "properties": {
            "amenities":{
                "rooms": 100,
                "gym": 2,
                "swimmingPool": true                    
            }                    
        },
        "zip": "zip"
    }
""".trimIndent()
val attributeValue = JsonAttributeValueUtil.toAttributeValue(hotel, objectMapper)
dynamoDbClient.putItem(
    PutItemRequest.builder()
            .tableName(DynamoHotelRepo.TABLE_NAME)
            .item(attributeValue.m())
            .build()
    )


Using the Library

The utility library is available here - https://github.com/bijukunjummen/aws-sdk2-dynamo-json-helper and provides details of how to get the binaries in place and use it with code.

 

Conclusion

AWS SDK 2 is an excellent and highly performant client, providing non-blocking support for client calls. I like how it provides a synchronous API and an asynchronous API and remains highly opionionated in consistenly providing a low level client API for calling the different AWS services. This utlility library provides a nice bridge for AWS SDK 2 to remain low level but be able to manage a json based document persistence and back. All the samples in this post are available in my github repository here - https://github.com/bijukunjummen/dynamodb-document-sample

Sunday, November 15, 2020

Permutation - Heap's Algorithm

 This is a little bit of an experimentation that I did recently to figure out a reasonable code to get all possible permutations of a set of characters. 


So say given a set of characters "ABC", my objective is to come up code which can spit out "ABC", "ACB", "BAC", "BCA", "CBA", "CAB". 


The approach I took is to go with the definition of permutation itself, so with "ABCD" as the set of characters a 4 slot that needs to be filled.



The first slot can be filled by any of A, B, C, D, in 4 ways:


The second slot by any of the remaining 3 characters, so with "A" in the first slot - 


The third slot by the remaining 2 characters, so with "A", "B" in the first two slots:
And finally, the fourth slot by the remaining 1 character, with say "A", "B", "C" in the first 3 slots:



In total, there would be 4 for the first slot * 3 for the 2nd slot * 2 for the 3rd slot * 1 for the 4th slot - 24 permutations altogether. 

I can do this in place, using an algorithm that looks like this:






A trace of flow and the swaps is here:
The only trick here is that the code does all the holding of characters and getting it into the right place in place by swapping the right characters to the right place and restoring it at the end of it. 

This works well for a reasonably sized set of characters - reasonable because for just 10 characters, there would be 3,628,800 permutations. 

An algorithm that works even better, though a complete mystery to me how it actually functions(well explained here if anybody is interested), is the Heap's Algorithm. Here is a java implementation of it: It very efficiently does one swap per permutation, which is still high but better than the approach that I have described before. 


In a sample perumutation of 8 characters, which generates 40320 permutations, the home cooked version swaps 80638 times, and the Heap's algorithm swaps 40319 times! thus proving its efficacy.

Tuesday, October 13, 2020

An experiment with Little's Law

My previous blog post about Project Reactor and Backpressure was about how Project Reactor provides sane defaults for scenarios where a lot more information is produced than a Consumer can consume. It does this by throttling the Producer such that in a steady-state the Producer's rate of production matches the Consumer's rate of consumption. This throttling is referred to as Backpressure.

For a stream of data, ultimately the Producer and the Consumer reach a steady state where the Producer is producing at a rate that the consumer is comfortable consuming.

So now in this stable system, how many items are in the system at any point in time. I decided to simply collect metrics for this information, by keeping a counter which is incremented and decremented as items are added into the system by the Producer and later consumed and removed by the Consumer.

Little's Law

This is a well-known problem however and I realized through more reading that this need not be measured but instead can be calculated using Little's Law defined the following way:



For my case, this maps to :

L - Number of Items in the system

λ - Consumption or Production Rate

W - Average Time spent in the System

Of course, it does require measuring the Consumption rate and the Average Time spent in the system though! The point is knowing any 2 of the values, the remaining value can be easily calculated.

Experiment

The exercise that I next performed was to compare the values that I measured in the system against the value calculated by Little's Law. No surprises here, the value measured by the system closely matches Little's law!

Let me consider a few of these scenarios here. To recap, my simple application consists of a Producer that can produce a sequence of numbers at a pre-defined rate. These sequences of numbers are consumed by a set of consumers.





Scenario 1: Unstable system with a high Producer rate

The first scenario that I considered is for a case where the Little's law actually is not supposed to work effectively, this is for an unstable system where the Producer and the Consumer produce and consume at a different rate. In my example the Producer produces a large amount of data (256 records at a time at the rate of 10 per second) and waits for the Consumer's to catch up at the rate of 4 per second) and then produces the next amount of data. You can imagine that a lot of data will be buffered in the system and the L value will be high.

A graph of the calculated(in Yellow) and measured L(in Green) value shows up the following way:



The L is around 150, so 150 records are in the system.  Although this is not a stable system,  the calculated L value matches the measured L value fairly well. 


Scenario 2: Stable System with similar Producer and Consumer rate

Little's law shines for a stable system. Consider the following scenario where the Producer and Consumer rate matches up. A graph now looks like this:


 

This time the measured L value lines up perfectly with the calculated value, thus proving the utility of Little's law.

Conclusion

There is nothing much to conclude here, Little's law is a proven law, the interest for me was in observing how well it pans out with an experiment. Personally, it has been satisfying to see a law line up against an experiment.

I have this entire set-up in a github repository here if you would like to replicate it.

Sunday, July 26, 2020

Backpressure in Project Reactor

Project Reactor implements the Reactive Streams specification, which is a standard for asynchronously processing a stream of data while respecting the processing capabilities of a consumer. 

At a very broad level, there are two entities involved, a Producer that produces the stream of data and a Consumer that consumes data. If the rate at which a Consumer consumes data is less than the rate at which a Producer produces data (referred to as a Fast Producer/Slow Consumer), then signals from the consumer can constrain the rate of production, this is referred to as Backpressure and in this post, I will be demonstrating a few backpressure examples using Project Reactor

Before I go ahead, I have to acknowledge that these examples are loosely based on what I learned from the "Reactive Programming with RxJava" book.

Producer

Flux in Project Reactor represents an asynchronous stream of 0..N data, where N can potentially be infinite. 

Consider a simple example, generating a sequence of numbers. There are built-in ways in Flux to do this, but for the example, I will be using an operator called Flux.generate. Sample code looks like this:
fun produce(targetRate: Int, upto: Long): Flux<Long> {
    val delayBetweenEmits: Long = 1000L / targetRate

    return Flux.generate(
        { 1L },
        { state: Long, sink: SynchronousSink<Long> ->
            sleep(delayBetweenEmits)
            val nextState: Long = state + 1
            if (state > upto) {
                sink.complete()
                nextState
            } else {
                LOGGER.info("Emitted {}", state)
                sink.next(state)
                nextState
            }
        }
    )
}

Here "targetRate" is the rate per second at which the Producer is expected to produce a sequence of numbers and "upto" represents the range for which the sequence is to be generated. "Thread.sleep" is used for introducing the delay between emissions.


Consumer

A consumer for this stream of data just consumes the sequence of numbers and to simulate processing while consuming the data, delays are again introduced just before reading the information, along these lines:
val delayBetweenConsumes: Long = 1000L / consumerRate
producer.produce(producerRate, count)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }

Just like with rate at the Producer side, there is a rate of consuming on the consumer side which drives the delay before consuming the data.


Scenario 1: Fast Producer, Slow Consumer without Threading

Now that I have a stream of data for which I can control the rate of production and rate of consumption, the first test that I ran was with the producer and the consumer chained together. 

The Producer produces at the rate of 100 requests a second and the consumer consuming it at 3 per second. 

If there were no backpressure mechanisms in place you would expect that Producer would merrily go along and produce all the records at its own pace of 100 per second and Consumer would slowly catch up at the rate of 3 per second.  This is NOT what happens though. 

The reason is not that intuitive I feel, it is not really backpressure coming into play either. The Producer is constrained to 3 requests per second merely because the entire flow from the Producer to the Consumer is synchronous by default and since the production and the consumption are happening on the same thread, the behavior is automatically constrained to what the Consumer is comfortable in consuming. 

Here is a graph which simply plots the rate of production and consumption over time and captures clearly the exact same rate of Production and Consumption throughout:


This behavior is borne out from the logs also, which show that the consumer and producer remain in sync:
2020-07-26 17:51:58.712  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 84
2020-07-26 17:51:59.048  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 84
2020-07-26 17:51:59.059  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 85
2020-07-26 17:51:59.393  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 85
2020-07-26 17:51:59.404  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 86
2020-07-26 17:51:59.740  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 86
2020-07-26 17:51:59.751  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 87
2020-07-26 17:52:00.084  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 87
2020-07-26 17:52:00.095  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 88
2020-07-26 17:52:00.430  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 88
2020-07-26 17:52:00.441  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 89
2020-07-26 17:52:00.777  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 89
2020-07-26 17:52:00.788  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 90
2020-07-26 17:52:01.087  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 90
2020-07-26 17:52:01.097  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 91
2020-07-26 17:52:01.432  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 91
2020-07-26 17:52:01.442  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 92
2020-07-26 17:52:01.777  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 92
2020-07-26 17:52:01.788  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 93
2020-07-26 17:52:02.123  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 93
2020-07-26 17:52:02.133  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 94
2020-07-26 17:52:02.467  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 94
2020-07-26 17:52:02.478  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 95
2020-07-26 17:52:02.813  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 95
2020-07-26 17:52:02.824  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 96
2020-07-26 17:52:03.157  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 96
2020-07-26 17:52:03.168  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 97


Scenario 2: Fast Producer, Slow Consumer with Threading

The second scenario that I considered was with the Producer and the Consumer being produced independently in different threads. 

Project reactor makes this possible through two operators subscribeOn() which changes the thread where in my case the Producer produces the sequence and a publishOn() which shifts the consumption to a different thread. 

With these in place, the code looks like this:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
The results were a little surprising, this is what I saw in the logs:
...
2020-07-26 18:42:41.774  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 252
2020-07-26 18:42:41.786  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 253
2020-07-26 18:42:41.797  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 254
2020-07-26 18:42:41.809  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 255
2020-07-26 18:42:41.819  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 256
2020-07-26 18:42:42.019  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 9
2020-07-26 18:42:42.354  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 10
2020-07-26 18:42:42.689  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 11
2020-07-26 18:42:43.024  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 12
2020-07-26 18:42:43.358  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 13
2020-07-26 18:42:43.691  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 14
2020-07-26 18:42:44.027  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 15
2020-07-26 18:42:44.363  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 16
.....
2020-07-26 18:43:43.724  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 299
2020-07-26 18:43:43.735  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 300
2020-07-26 18:43:43.913  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 194
2020-07-26 18:43:44.248  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 195
2020-07-26 18:43:44.581  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 196
...
A sequence of numbers upto 256 was produced immediately and then the Producer waited for the Consumer to catch up, once the consumer caught up, the remaining emissions happened. This is how the graph for this looks: 



Clearly, backpressure is acting on this stream of data. The surprising aspect for me was the backpressure appeared to be triggering at a large value of 256 records from upstream.

Analyzing this is a little, the reason I realized is that an intermediate operation is buffering the requests. The intermediate operation in this instance happens to be the "publishOn()" operator that I am using, a variant of "publishOn()" which additionally takes in a prefetch parameter fixes the size of the buffer. 

In my case setting it to 10 felt reasonable, the code looks like this now:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
and the graph with the Producer and Consumer remains closely in sync:

Producer In Green, Consumer in Red


Scenario 3: Fast Producer, Multi-threaded Consumer

If you look closely at the name of the threads in logs from the first two scenarios then you would notice that the names of the thread at the point of production and at the point of consumption are always the same. The operators "publishOn()" and "subscribeOn()" don't parallelize the operation, they only switch the execution context of the operations. To really parallelize the operations, two approaches can be taken: 

  1. Using the parallel operator 
  2. Using flatMap flavors with their own "subscribeOn" operators 

For the 3rd scenario, I went for the second option of using flatMap and it looks something like this:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .flatMap({ value: Long ->
        Mono.fromSupplier {
            sleep(delayBetweenConsumes)
            logger.info("Consumed {}", value)
            null
        }.subscribeOn(flatMapScheduler)
    }, concurrency)
    .subscribe()
The work of consuming the produced sequence of numbers is being done inside the flatMap operation, the number of concurrent consumption is set to 5 by default. Running this scenario produces the following logs, the consumers are now running 5 at a time on multiple threads:
2020-07-26 23:26:27.212  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 1
2020-07-26 23:26:27.321  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 2
2020-07-26 23:26:27.423  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 3
...
2020-07-26 23:26:28.040  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 9
2020-07-26 23:26:28.143  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 10
2020-07-26 23:26:28.222  INFO 1 --- [      flatMap-4] sample.meter.Consumer                    : Consumed 1
2020-07-26 23:26:28.328  INFO 1 --- [      flatMap-5] sample.meter.Consumer                    : Consumed 2
2020-07-26 23:26:28.428  INFO 1 --- [      flatMap-6] sample.meter.Consumer                    : Consumed 3
2020-07-26 23:26:28.527  INFO 1 --- [      flatMap-7] sample.meter.Consumer                    : Consumed 4
...
The rate of production lines up with the rate of consumption

Producer In Green/Consumer in Red


Conclusion

These are different experiments that I was able to run to simulate backpressure scenarios with Project Reactor and the behavior should be true for most Reactive Streams based libraries. 

Project Reactor has sane defaults in managing the backpressure needs of a Consumer and provides ways to override the defaults. 

In all scenarios that I have run in this post, the Producer throttled the production to a rate that the Consumer was comfortable in consuming. 

 If you are interested in exploring the scenarios further, my codebase along with the grafana/prometheus set up for graphing the output is available in my github repository here - https://github.com/bijukunjummen/backpressure-demo

Saturday, July 4, 2020

Expressing a conditional expression using Json - Java Implementation

I had a need recently to express a conditional expression in a form that a front end Javascript application and a backend Java application could both create and read. Expressing the conditional expression as a Json felt logical and after a quick search, JsonLogic library appeared to fit exactly what I was looking for.

JsonLogic follows a prefix notation for its expressions, along these lines:

{"operator" : ["values" ... ]}
So for eg, given a JSON input data that looks like this:
{
  "a": "hello",
  "b": 1,
  "c": [
    "elem1",
    "elem2"
  ]
}
For equality, an expression using JsonLogic is the following:
{"==" : [ { "var" : "a" }, "hello" ] }
Here the data is being looked up using "var" expression and the equality is checked using the "==" operator. 

Though it is a good fit, I decided to go with an alternate way of expressing the conditional expression but heavily inspired by JsonLogic. So, in my implementation, equality with the sample JSON looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Fairly similar, the location to the data is expressed as a Json Pointer and the operators are textual ("equals" vs "==") The full set of supported features are also much smaller than JsonLogic as that sufficed my needs for the project. So now I have a small Java-based library that supports these simplified conditional expressions and this post will go into the details of the operators and the usage of the library.



Sample Expressions

Once more to touch on the sample conditional expressions, everything takes the form of:
{"operator" : ["values" ... ]}
A check for equality looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Not operator:
{
    "not": [
        {
            "equals": [
                "/a",
                "hello"
            ]
        }
    ]
}
And/Or operator:
{
    "and": [
        {
            "equal": [
                "/a", "hello"
            ]
        },
        {
            "equal": [
                "/b", 1
            ]
        }
    ]
}
There are a few operators that work on collections, for eg, to check if "c" in the sample JSON has elements "elem1", "elem2":
{
    "contains": [
        "/c", ["elem1", "elem2"]
    ]
}
or to check if the collection has any of the elements "elem1", "elem2":
{
    "containsAnyOf": [
        "/c", ["elem1", "elem2"]
    ]
}

Details of the Library

The Java-based library is built on top of the excellent Jackson JSON parser library and uses it to parse the expression which once parsed is interpreted by the library. A Gradle based project can pull in the dependency the following way(published currently to JCenter):
implementation 'com.github.bijukunjummen:json-conditional-expression:0.4.0'
and use the library along these lines, using a sample Kotlin code:
val jsonExpressionEvaluator: JsonExpressionEvaluator = JsonExpressionEvaluator(ObjectMapper())
jsonExpressionEvaluator.matches(expression, json) //returns true

Conclusion

The conditional expression and the corresponding Java-based interpreter are fairly simple and have sufficed my needs with the kind of operators that I needed for my project, however, I will be more than happy to extend and support more extensive operators if there is enough interest in using the library.


Reference

1. JsonLogic which provided the inspiration for using a prefix notation to represent a conditional expression

Monday, May 25, 2020

AWS DynamoDB version field using AWS SDK for Java 2

It is useful to have a version attribute on any entity saved to an AWS DynamoDB database which is simply a numeric indication of the number of times the entity has been modified. When the entity is first created it can be set to 1 and then incremented on every update. 

The benefit is immediate - an indicator of the number of times an entity has been modified which can be used for auditing the entity. Also, an additional use is for optimistic locking where an entity is allowed to be updated only if the holder updating it has the right version of the entity.


This post will go into details of how to introduce such a field with the DynamoDB related libraries of AWS SDK 2

Model

Consider a model called Hotel which is being persisted into a dynamo database. In Kotlin, it can be represented using the following data class:

data class Hotel(
    val id: String = UUID.randomUUID().toString(),
    val name: String,
    val address: String? = null,
    val state: String? = null,
    val zip: String? = null,
    val version: Long = 1L
)
A version field has been introduced in this model with an initial value of 1. The aim will be to save this field as-is and then let dynamo atomically manage the increment of this field at the point of saving this entity.

As the fields in this model gets changed, I would like the version to be updated along these lines:


 


Local version of DynamoDB

It is useful to have DynamoDB running on the local machine, this way not having to create real DynamoDB tables in AWS.

There are multiple ways of doing this. One is to use a docker version of DynamoDB Local, which can be started up the following way to listen on port 4569:


docker run -p 4569:8000 amazon/dynamodb-local:1.13
My personal preference is to use localstack and the instructions at the site have different ways to start it up. I normally use docker-compose to bring it up. One of the reasons to use localstack over DynamoDB Local is that localstack provides a comprehensive set of AWS services for local testing and not just DynamoDB.


Quick Demo

I have the entire code available in my github repo herehttps://github.com/bijukunjummen/boot-with-dynamodb

Once the application is brought up using the local version of dynamoDB, an entity can be created using the following httpie request:



http :9080/hotels id=4 name=name address=address zip=zip state=OR
With a response, where the version field is set to 1:
{
    "address": "address",
    "id": "4",
    "name": "name",
    "state": "OR",
    "version": 1,
    "zip": "zip"
}
Then if the name is updated for the entity:
http PUT :9080/hotels/4 name=name1 address=address zip=zip state=OR version=1
the version field gets updated to 2 and so on:
{
    "address": "address",
    "id": "4",
    "name": "name1",
    "state": "OR",
    "version": 2,
    "zip": "zip"
}
Also note that if during an update a wrong version number is provided, the call would fail as there is an optimistic locking in place using this version field.

Implementing the version field

Implementing the version field depends on the powerful UpdateItem API provided by DynamoDB. One of the features of UpdateItem API is that it takes in a "UpdateExpression" which is a dsl which shows how different Dynamo attributes should be updated.

The raw request to AWS DynamoDB looks like this:

 
{
  "TableName": "hotels",
  "Key": {
    "id": {
      "S": "1"
    }
  },
  "UpdateExpression": "\nSET #name=:name,\n #state=:state,\naddress=:address,\nzip=:zip\nADD version :inc\n ",
  "ExpressionAttributeNames": {
    "#state": "state",
    "#name": "name"
  },
  "ExpressionAttributeValues": {
    ":name": {
      "S": "testhotel"
    },
    ":address": {
      "S": "testaddress"
    },
    ":state": {
      "S": "OR"
    },
    ":zip": {
      "S": "zip"
    },
    ":inc": {
      "N": "1"
    }
  }
}               
From the articles perspective, specifically focus on "ADD version :inc", which is an expression that tells AWS DynamoDB to increment the value of version by ":inc" value, which is provided separately using "ExpressionAttributeValues" with "1". Dealing with raw API in its json form is daunting, that is where the Software Development Kit(SDK) that AWS provides comes in, AWS SDK for Java 2 is a rewrite of AWS SDK's with a focus on using the latest Java features and Non-Blocking IO over the wire. Using AWS SDK for Java 2, an "UpdateItem" looks like this(using Kotlin code):
val updateItemRequest = UpdateItemRequest.builder()
    .tableName(TABLE_NAME)
    .key(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build()
        )
    )
    .updateExpression(
    """
        SET #name=:name,
        #state=:state,
        address=:address,
        zip=:zip 
        ADD version :inc
    """
    )
    .conditionExpression("version = :version")
    .expressionAttributeValues(
        mapOf(
            ":${NAME}" to AttributeValue.builder().s(hotel.name).build(),
            ":${ZIP}" to AttributeValue.builder().s(hotel.zip).build(),
            ":${STATE}" to AttributeValue.builder().s(hotel.state).build(),
            ":${ADDRESS}" to AttributeValue.builder().s(hotel.address).build(),
            ":${VERSION}" to AttributeValue.builder().n(hotel.version.toString()).build(),
            ":inc" to AttributeValue.builder().n("1").build()
        )
    )
    .expressionAttributeNames(
        mapOf(
            "#name" to "name",
            "#state" to "state"
        )
    )
    .build()

val updateItem: CompletableFuture<UpdateItemResponse> = dynamoClient.updateItem(updateItemRequest)

return Mono.fromCompletionStage(updateItem)
    .flatMap {
        getHotel(hotel.id)
    }
The highlighted line has the "Update Expression" with all the existing fields set to a new value and the version attribute incremented by 1. Another thing to note about this call is the "conditionExpression", which is essentially a way to tell DynamoDB to update the attributes if a condition matches up, in this specific instance if the existing value of version matches up. This provides a neat way to support optimistic locking on the record.

Conclusion

A lot of details here - the easiest way to get a feel for it is by trying out the code which is available in my github repository here - https://github.com/bijukunjummen/boot-with-dynamodb. The readme has good details on how to run it in a local environment. 

AWS DynamoDB provides a neat way to manage a version field on entities, ensuring that they are atomically updated and provides a way for them to used for optimistic locking

Thursday, April 9, 2020

Processing SQS Messages using Spring Boot and Project Reactor - Part 2

This is a follow up to my blog post about processing SQS messages efficiently using Spring Boot and Project Reactor

There are a few gaps in the approach that I have listed in the first part.

1. Handling failures in SQS Client calls
2. The approach would process only 1 message from SQS at a time, how can it be parallelized
3. It does not handle errors, any error in the pipeline would break the entire process and stop reading newer messages from the queue.


Recap

Just to recap, the previous post demonstrates creating a pipeline to process messages from an AWS SQS Queue using the excellent Project Reactor


The end result of that exercise was a pipeline which looks like this:




Given this pipeline, let me now go over how to bridge the gaps:

Handling SQS Client Failures


This is the function that generates the stream of messages read from SQS.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())

Now consider a case where the "sqsClient" above has a connectivity issue, the behavior with Flux is that in case of an error the stream is terminated. This, of course, will not do for a service whose job is to process messages as long the service is running.

The fix is to simply retry the processing flow in case of errors.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()

This would result in Flux re-establishing the stream of messages in case of any errors up to this point.

Processing Messages in Parallel

Project Reactor provides a few ways of parallelizing a processing pipeline. My first attempt at processing in parallel was to add a "subscribeOn" method to the processing chain.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .subscribeOn(Schedulers.newElastic("sub"))

However, this is not quite how "subscribeOn" works. An output when I send a few messages to this pipeline is the following:

2020-04-07 20:52:53.241  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.434  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.493  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.538  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.609  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.700  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello

The "sub-3" above is the name of the thread processing the message, and it looks like all the messages are getting processed on the "sub-3" thread and on no other threads!

subscribeOn simply changes the execution context by borrowing "a thread" from this scheduler pool and does not use all the threads in the pool itself.

So how can the processing be parallelized? This StackOverflow answer provides a very good approach that I am using here, essentially to use a flatMap operator and adding the "subscribeOn" operator inside the "flatMap" operator.

This operator eagerly subscribes to its inner publishers and then flattens the result, the trick is that the inner subscribers can be provided their own schedulers and for each subscription will end up using a thread from the scheduler pool. The number of these concurrent subscribers can be controlled using a "concurrency" parameter passed to the flatMap operator.


Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .flatMap({ (message: String, deleteHandle: () -> Unit) ->
        task(message)
            .then(Mono.fromSupplier { Try.of { deleteHandle() } })
            .then()
            .subscribeOn(taskScheduler)
    }, concurrency)

and an output when processing multiple messages looks like this -

2020-04-08 21:03:24.582  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.815  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-5] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-6] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-7] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.817  INFO 17541 --- [  taskHandler-8] sample.msg.MessageListenerRunner         : Processed Message hello
see how there are more than thread name (taskHandler-*) in the logs now!


Handling downstream errors

One of my previous fixes with "retry" operator was about handling upstream errors with sqsClient connectivity. However, it is possible that as messages are being processed in the pipeline and any of the steps throw an error then the entire pipeline would fail. So it is important to guard EVERY step against failure. A neat way that I have been ensuring that errors don't propagate out is to use the excellent vavr library and its "Try" type. Try type holds two outcomes - a successful one(Success) or an Exception(Failure). This allows the rest of the pipeline to act on the outcome of the previous step in a measured way:

.flatMap({ (message: String, deleteHandle: () -> Unit) ->
    task(message)
        .then(Mono.fromSupplier { Try.of { deleteHandle() } })
        .doOnNext { t ->
            t.onFailure { e -> LOGGER.error(e.message, e) }
        }
        .then()
        .subscribeOn(taskScheduler)
}, concurrency)

The above snippet demonstrates an approach where I know that "deleteHandle" which is responsible for deleting a message can throw an exception, Try captures this and if there is an error logs it and this way the exception does not short circuit the flow of messages.



Conclusion

My initial thinking was that just because I have taken a reactive approach to process messages I would get huge boost in my sqs message processing pipeline, however, my learning has been that just like everything else it requires careful understanding and tuning for a Project reactor based stream to process messages efficiently. I am sure there a few more lessons for me to learn and I will be documenting those as I do.

This entire sample is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs

Sunday, March 22, 2020

Processing SQS Messages using Spring Boot and Project Reactor

I recently worked on a project where I had to efficiently process a large number of messages streaming in through an AWS SQS Queue. In this post (and potentially one more), I will go over the approach that I took to process the messages using the excellent Project Reactor



The following is the kind of set-up that I am aiming for:



Setting up a local AWS Environment

Before I jump into the code, let me get some preliminaries out of the way. First, how do you get a local version of SNS and SQS. One of the easiest ways is to use localstack. I use a docker-compose version of it described here

The second utility that I will be using is the AWS CLI. This website has details on how to install it locally.

Once both of these utilities are in place, a quick test should validate the setup:

# Create a queue
aws --endpoint http://localhost:4576 sqs create-queue --queue-name test-queue

# Send a sample message
aws --endpoint http://localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"

# Receive the message
aws --endpoint http://localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue


Basics of Project Reactor


Project Reactor implements the Reactive Streams specification and provides a way of handling streams of data across asynchronous boundaries that respects backpressure. A lot of words here but in essence think of it this way:
1. SQS Produces data
2. The application is going to consume and process it as a stream of data
3. The application should consume data at a pace that is sustainable - too much data should not be pumped in. This is formally referred to as "Backpressure"



AWS SDK 2

The library that I will be using to consume AWS SQS data is the AWS SDK 2. The library uses non-blocking IO under the covers.

The library offers both a sync version of making calls as well as an async version. Consider the synchronous way to fetch records from an SQS queue:

import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.SqsClient

val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()

Here "software.amazon.awssdk.services.sqs.SqsClient" is being used for querying sqs and retrieving a batch of results synchronously. An async result, on the other hand, looks like this:



val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }


The output now is now a "CompletableFuture"

Infinite loop and no backpressure

My first attempt at creating a stream(Flux) of message is fairly simple - an infinite loop that polls AWS sqs and creates a Flux from it using the "Flux.create" operator, this way:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()

                    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
                    LOGGER.info("Received: $messages")
                    sink.next(messages)
                } catch (e: InterruptedException) {
                    LOGGER.error(e.message, e)
                } catch (e: Exception) {
                    LOGGER.error(e.message, e)
                }
            }
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that there is an infinite loop that checks for new messages using long-polling. Messages may not be available at every poll, in which case an empty list is added to the stream.

This list of atmost 5 messages is then mapped to a stream of individual messages using the "flatMapIterable" operator, which is further mapped by extracting the message from the SNS wrapper (as message gets forwarded from SNS to SQS, SNS adds a wrapper to the message) and a way to delete the message(deleteHandle) once the message is successfully processed is returned as Pair.


This approach works perfectly fine... but imagine a case where a huge number of messages have come in, since the loop is not really aware of the throughput downstream it will keep pumping data to the stream. The default behavior is for the intermediate operators to buffer this data flowing in based on how the final consumer is consuming the data. Since this buffer is unbounded it is possible that the system may reach an unsustainable state.


Backpressure aware stream

The fix is to use a different operator to generate the stream of data - Flux.generate.
Using this operator the code looks like this:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.generate { sink: SynchronousSink<List<Message>> ->
            val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(5)
                .waitTimeSeconds(10)
                .build()

            val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
            LOGGER.info("Received: $messages")
            sink.next(messages)
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that the block passed to the "Flux.generate" operator is repeatedly called - similar to the while loop, in each loop one item is expected to be added to the stream. In this instance, the item added to the stream happens to be a list which like before is broken down into individual messages.

How does backpressure work in this scenario -
So again consider the case where the downstream consumer is processing at a slower rate than the generating end. In this case, Flux itself would slow down at the rate at which the generate operator is called, thus being considerate of the throughput of the downstream system.

Conclusion

This should set up a good pipeline for processing messages from SQS, there are a few more nuances to this to process messages in parallel later in the stream which I will cover in a future post.

The codebase of this example is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs. The code has a complete pipeline which includes processing the message and deleting it once processed.

Tuesday, February 25, 2020

Project Reactor expand method

One of my colleagues at work recently introduced me to the expand operator of the Project Reactor types and in this post I want to cover a few ways in which I have used it.


Unrolling a Paginated Result

Consider a Spring Data based repository on a model called City:

import org.springframework.data.jpa.repository.JpaRepository;
import samples.geo.domain.City;

public interface CityRepo extends JpaRepository<City, Long> {
}
This repository provides a way to retrieve the paginated result, along the following lines:

cityRepo.findAll(PageRequest.of(0, 5))

Now, if I were to unroll multiple pages into a result, the way to do it would be the following kind of a loop:

var pageable: Pageable = PageRequest.of(0, 5)
do {
    var page: Page<City> = cityRepo.findAll(pageable)
    page.content.forEach { city -> LOGGER.info("City $city") }
    pageable = page.nextPageable()
} while (page.hasNext())


An equivalent unroll of a paginated result can be done using the Reactor expand operator the following way:

val result: Flux<City> =
    Mono
        .fromSupplier { cityRepo.findAll(PageRequest.of(0, 5)) }
        .expand { page ->
            if (page.hasNext())
                Mono.fromSupplier { cityRepo.findAll(page.nextPageable()) }
            else
                Mono.empty()
        }
        .flatMap { page -> Flux.fromIterable(page.content) }

result.subscribe(
    { page -> LOGGER.info("City ${page}") },
    { t -> t.printStackTrace() }
)


Here the first page of results expands to the second page, the second page to the third page and so on until there are no pages to retrieve.

Traversing a Tree


Consider a node in a tree structure represented by the following model:

data class Node(
    val id: String,
    val nodeRefs: List<String>,
)

A sample data which looks like this:


can be traversed using a call which looks like this:

val rootMono: Mono<Node> = nodeService.getNode("1")
val expanded: Flux<Node> = rootMono.expand { node ->
    Flux.fromIterable(node.childRefs)
        .flatMap { nodeRef -> nodeService.getNode(nodeRef) }
}
expanded.subscribe { node -> println(node) }

This is a breadth-first expansion, the output looks like this:

Node-1
Node-1-1
Node-1-2
Node-1-1-1
Node-1-1-2
Node-1-2-1
Node-1-2-2

An expandDeep variation would traverse it depth-first

Monday, January 20, 2020

Spring WebClient and Java date-time fields

WebClient is Spring Framework's reactive client for making service to service calls.

WebClient has become a go to utility for me, however I unexpectedly encountered an issue recently in the way it handles Java 8 time fields that tripped me up and this post goes into the details.


Happy Path

First the happy path. When using a WebClient, Spring Boot advices a "WebClient.Builder" to be injected into a class instead of the "WebClient" itself and a WebClient.Builder is already auto-configured and available for injection.


Consider a fictitious "City" domain and a client to create a "City". "City" has a simple structure, note that the creationDate is a Java8 "Instant" type:

import java.time.Instant

data class City(
    val id: Long,
    val name: String,
    val country: String,
    val pop: Long,
    val creationDate: Instant = Instant.now()
)

The client to create an instance of this type looks like this:

class CitiesClient(
    private val webClientBuilder: WebClient.Builder,
    private val citiesBaseUrl: String
) {
    fun createCity(city: City): Mono<City> {
        val uri: URI = UriComponentsBuilder
            .fromUriString(citiesBaseUrl)
            .path("/cities")
            .build()
            .encode()
            .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .bodyValue(city)
            .exchange()
            .flatMap { clientResponse ->
                clientResponse.bodyToMono(City::class.java)
            }
    }
}

See how the intent is expressed in a fluent way. The uri and the headers are first being set, the request body is then put in place and the response is unmarshalled back to a "City" response type.


All well and good. Now how does a test look like.

I am using the excellent Wiremock to bring up a dummy remote service and using this CitiesClient to send the request, along these lines:

@SpringBootTest 
@AutoConfigureJson
class WebClientConfigurationTest {

    @Autowired
    private lateinit var webClientBuilder: WebClient.Builder

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @Test
    fun testAPost() {
        val dateAsString = "1985-02-01T10:10:10Z"

        val city = City(
            id = 1L, name = "some city",
            country = "some country",
            pop = 1000L,
            creationDate = Instant.parse(dateAsString)
        )
        WIREMOCK_SERVER.stubFor(
            post(urlMatching("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .withHeader("Content-Type", equalTo("application/json"))
                .willReturn(
                    aResponse()
                        .withHeader("Content-Type", "application/json")
                        .withStatus(HttpStatus.CREATED.value())
                        .withBody(objectMapper.writeValueAsString(city))
                )
        )

        val citiesClient = CitiesClient(webClientBuilder, "http://localhost:${WIREMOCK_SERVER.port()}")

        val citiesMono: Mono<City> = citiesClient.createCity(city)

        StepVerifier
            .create(citiesMono)
            .expectNext(city)
            .expectComplete()
            .verify()


        //Ensure that date field is in ISO-8601 format..
        WIREMOCK_SERVER.verify(
            postRequestedFor(urlPathMatching("/cities"))
                .withRequestBody(matchingJsonPath("$.creationDate", equalTo(dateAsString)))
        )
    }

    companion object {
        private val WIREMOCK_SERVER =
            WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().notifier(ConsoleNotifier(true)))

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

In the highlighted lines, I want to make sure that the remote service receives the date in ISO-8601 format as "1985-02-01T10:10:10Z". In this instance everything works cleanly and the test passes.

Not so happy path

Consider now a case where I have customized the WebClient.Builder in some form. An example is say I am using a registry service and I want to look up a remote service via this registry and then make a call then the WebClient has to be customized to add a "@LoadBalanced" annotation on it - some details here

So say, I have customized WebClient.Builder this way:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().filter { req, next ->
            LOGGER.error("Custom filter invoked..")
            next.exchange(req)
        }
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

It looks straightforward, however now the previous test fails. Specifically the date format of the creationDate over the wire is not ISO-8601 anymore, the raw request looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": 476100610.000000000
}

vs for a working request:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

See how the date format is different.

Problem

The underlying reason for this issue is simple, Spring Boot adds a bunch of configuration on WebClient.Builder that is lost when I have explicitly created the bean myself. Specifically in this instance there is a Jackson ObjectMapper created under the covers which by default writes dates as timestamps - some details here.

Solution

Okay, so how do we get back the customizations that Spring Boot makes. I have essentially replicated the behavior of a auto-configuration in Spring called "WebClientAutoConfiguration" and it looks like this:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(customizerProvider: ObjectProvider<WebClientCustomizer>): WebClient.Builder {
        val webClientBuilder: WebClient.Builder = WebClient
            .builder()
            .filter { req, next ->
                LOGGER.error("Custom filter invoked..")
                next.exchange(req)
            }

        customizerProvider.orderedStream()
            .forEach { customizer -> customizer.customize(webClientBuilder) }

        return webClientBuilder;
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

There is a likely a better approach than just replicating this behavior, but this approach works for me.

The posted content now looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

with the date back in the right format.

Conclusion

Spring Boot's auto-configurations for WebClient provides a opinionated set of defaults. If for any reason the WebClient and it's builder need to be configured explicitly then be wary of some of the customizations that Spring Boot adds and replicate it for the customized bean. In my case, the Jackson customization for Java 8 dates was missing in my custom "WebClient.Builder" and had to be explicitly accounted for.

A sample test and a customization is available here