Thursday, December 30, 2021

Cloud Bigtable - Write and Retrieval

This is a quick write up based on a few days of experimentation with Cloud Bigtable, with the following objectives:

1. Using an emulator for local development

2. A high level schema design with retrieval patterns in mind

3. Finding records

Emulator

Cloud Bigtable emulator provides a way to test the Bigtable functionality locally.  Setting up the emulator is easy and is described in this document. Assuming that the gcloud utility, which is a CLI to work with the Google Cloud resources, is available on the machine, then the following command should get the emulator in place:
gcloud components install bigtable
Once installed, the emulator can be started up using the following command:
gcloud beta emulators bigtable start --host-port=localhost:8086
This brings up the emulator at port 8086.


Working with the Emulator

Now that a local instance of Bigtable is up, working with it requires another utility called "cbt", which can be installed, again using gcloud, the following way:
gcloud components install cbt
A table to hold an entity modeled after a "Hotel", call it "hotels" along with a "columnfamily" to hold the details, called "hotel_details", looks like this:
BIGTABLE_EMULATOR_HOST=localhost:8086
cbt -project "project-id" createtable hotels
cbt -project "project-id" createfamily hotels hotel_details
Now that the emulator and the cbt utility is available, let's start with a modeling exercise. Take this modeling exercise with a pinch of salt, my knowledge of Bigtable is evolving and the approach here likely will need heavy polishing.


Schema Design for an Entity

So my objective is to provide basic write and read functionality on a "Hotel" entity, described using a golang struct the following way:
type Hotel struct {
	Id      string
	Name    string
	Address string
	Zip     string
	State   string
}
To store such an entity into Bigtable attention should be paid to how the data will ultimately be read. In my case, there are going to be two read patterns. 
  1. Retrieval by Hotel's id field
  2. Retrieving a list of hotels by the zip code
Now, Bigtable supports only 1 index, called the "Row key", and retrieval of a single record can be using this "Row key" or a set of records can be retrieved using the prefix of a row key.

In my case it will be difficult to support retrieval by id AND retrieval by zip code using one Row key, so my schema design is to have multiple records with different row keys for a single Hotel entity, along these lines, say for a Hotel which looks like this:



To support retrieval by id my row key looks something like this:
H/id#id1 along with data for the hotel being set to different column names.

To support retrieval by zip code my row key looks like this:
H/Zip#OR-1/Id#id1, the data this time points to the row key of the actual data which is H/id#id1, this way the entire data for the hotel does not have to duplicated. Given this row key, say if all hotels with a Zip code of OR-1 has to be retrieved, I can do it using a row key prefix of "H/Zip#OR-1" and then hydrate the information using the Id from the data.

So with this storing the information of a real hotel into Bigtable and querying it back looks like this in raw form:

----------------------------------------
H/Id#d7d63398-3442-413b-8859-3e73016fc5cc
  hotel_details:address                    @ 2021/12/29-20:53:30.816000
    "525 SW Morrison St, Portland"
  hotel_details:id                         @ 2021/12/29-20:53:30.816000
    "d7d63398-3442-413b-8859-3e73016fc5cc"
  hotel_details:name                       @ 2021/12/29-20:53:30.816000
    "The Nines"
  hotel_details:state                      @ 2021/12/29-20:53:30.816000
    "OR"
  hotel_details:zip                        @ 2021/12/29-20:53:30.816000
    "97204"
----------------------------------------
H/Zip#97204/Id#d7d63398-3442-413b-8859-3e73016fc5cc
  hotel_details:key                        @ 2021/12/29-20:53:30.816000
    "H/Id#d7d63398-3442-413b-8859-3e73016fc5cc"
This works quite well, I am not entirely sure if this optimal though, I will revisit the approach once I have gained a little more experience with using Bigtable

Retrieving by Zip Code

Assuming that a bunch of Hotels are present in the database with this schema design, a retrieval by zip code looks like this in golang:

func findHotels(table *bigtable.Table, ctx context.Context, zip string) ([]types.Hotel, error) {
	searchPrefix := fmt.Sprintf("H/Zip#%s", zip)
	var keys []string
	var hotels []types.Hotel
	err := table.ReadRows(ctx, bigtable.PrefixRange(searchPrefix),
		func(row bigtable.Row) bool {
			keys = append(keys, keyFromRow(row))
			return true
		})

	if err != nil {
		return nil, fmt.Errorf("error in searching by zip code: %v", err)
	}

	err = table.ReadRows(ctx, bigtable.RowList(keys), func(row bigtable.Row) bool {
		hotels = append(hotels, hotelFromRow(row))
		return true
	})
	if err != nil {
		return nil, fmt.Errorf("error in retrieving by keys: %v", err)
	}
	return hotels, nil
}
The code starts by generating the search prefix, which has a pattern of "H/Zip#zipcode" and retrieves the id from the retrieved records, and then batches a call to the table with the retrieved id's to get the details.

Conclusion

It may be easier to follow this along with real code, which is in my github repository available here - https://github.com/bijukunjummen/golang-bigtable-sample. This has sample to write to Bigtable and then retrieve from it.

Monday, December 20, 2021

Service to Service call patterns - Multi-cluster Service

This is third blog post as part of a series exploring service to service call patterns in different application runtimes in Google Cloud.

The first post explored Service to Service call pattern in a GKE runtime using a Kubernetes Service abstraction

The second post explored Service to Service call pattern in a GKE runtime with Anthos Service mesh

This post will explore the call pattern across multiple GKE runtimes with Multi-Cluster Service providing a way for calls to be made across clusters.

Mind you, the preferred way for service to service call ACROSS clusters is using Anthos Service Mesh, which will be covered in the next blog post, however Multi-Cluster service is also a perfectly valid approach in the absence of Anthos Service Mesh.

Target Architecture

A target architecture that I am aiming for is the following:



Here two different applications are hosted on two separate Kubernetes clusters in different availability zones and the Service(called "Caller") in one cluster invokes the Service(called "Producer") in another cluster.

Creating the Cluster with Multi-Cluster Services

The details on bringing up 2 clusters and enabling Multi-cluster services is detailed in this document

Services Installation

Assuming that the 2 GKE clusters are now available, the first cluster holds the Caller and an Ingress Gateway to enable the UI of the caller to be accessible to the user. This is through a deployment descriptor which looks something like this for the caller:


apiVersion: apps/v1
kind: Deployment
metadata:
  name: sample-caller-v1
  labels:
    app: sample-caller
    version: v1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sample-caller
      version: v1
  template:
    metadata:
      labels:
        app: sample-caller
        version: v1
    spec:
      serviceAccountName: sample-caller-sa
      containers:
        - name: sample-caller
          image: us-docker.pkg.dev/sample/docker-repo/sample-caller:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8080
          securityContext:
            runAsUser: 1000
          resources:
            requests:
              memory: "256Mi"
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8080
            initialDelaySeconds: 3
            periodSeconds: 3
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
I have reproduced the entire yaml just for demonstration, there is nothing that should stand out in the file.

Along the same lines the Producer application is deployed to the second cluster.

Caller to Producer call - Using Multi-Cluster Services

The right approach to getting service to service call working across a cluster is to use a feature of Anthos called Multi-cluster service and is described in detail in this blog post and this how to post

The short of it is that if a "ServiceExport" resource is defined in cluster 2 and if the same namespace exists in Cluster 1 then the Service is resolved using a host name of the form "service-name.namespace.svc.clusterset.local" and in my case this maps to "sample-producer.istio-apps.svc.clusterset.local"!. The ServiceExport resource looks something like this:


kind: ServiceExport
apiVersion: net.gke.io/v1
metadata:
  namespace: istio-apps
  name: sample-producer
This is the only change that I have to make to the caller, instead of calling Producer using "sample-producer", now it uses the host name of "sample-producer.istio-apps.svc.clusterset.local" and everything resolves cleanly and the call continues to work across the cluster.

View from the caller:

View from the Producer:


Conclusion

I hope this clarifies to some extent how service to service call can be enabled across multiple clusters, even across regions. 

There are a few small catches, for eg, to get the Mutual TLS to work across clusters is not easy. This is cleanly solved when using Anthos Service Mesh and will be detailed in the next blog post.


Thursday, November 18, 2021

Service to Service call patterns - GKE with Anthos Service Mesh on a single cluster

 This is second in a series of posts exploring service to service call patterns in some of the application runtimes on Google Cloud. The first in the series explored service to service call patterns in GKE

This post will expand on it by adding in a Service Mesh, specifically Anthos Service Mesh, and explore how the service to service patterns change in the presence of a mesh. The service to service call with be across services in a single cluster. The next post will explore services deployed to multiple GKE clusters.


Set-Up

The steps to set-up a GKE cluster and install Anthos service mesh on top of it is described in this document - https://cloud.google.com/service-mesh/docs/unified-install/install, in brief these are the commands that I had to run in my GCP Project to get a cluster running:


If the installation of cluster and the mesh has run through cleanly, a good way to verify the installation is to see if the cluster gets registered as a Anthos managed cluster in the Google Cloud Console.



The services that I will be installing is fairly simple and looks like this:


Using a UI, the caller can make the producer behave in certain ways:
  • Introduce response time delays
  • Respond with certain status codes
This will help check how the mesh environment will behave in the face of these behaviors.


The codebase for the "caller" and "producer" are in this repository - https://github.com/bijukunjummen/sample-service-to-service, there are kubernetes manifests available in the repository to bring up these services.

Behavior 1 - Mutual TLS

The first behavior that I want to see is for the the caller and the producer to verify each others identities by presenting and validating their certificates.

This can be done by adding in a istio DestinationRule for the producer, along these lines:






This also adds in the DestinationRule for the caller, this is because the caller gets the call from the browser via an Ingress Gateway and even this call needs to be authenticated using mtls

Alright now that the set-up in place, the following is what gets captured as the request flows from the Browser to the Ingress Gateway to the Caller to the Producer.


The sign that the mTLS works is seeing the "x-forwarded-client-cert" header, this is in both the Callers headers coming in from Ingress-gateway, and in the "Producers" headers coming in from the Caller.

Behavior 2 - Timeout

The second behavior that I want to explore is the timeouts. A request timeout can be set for the call from the Caller to Producer by creating a Virtual Service for the Producer with the value set, along these lines:



With this configuration in place a request from the caller with a delay of 6 seconds, causes the Mesh to timeout and present an error that looks like this:


The mesh responds with a http status code of 504 with a message of "Upstream timed out". 

Behavior 3 - Circuit Breaker

Circuit breaker is implemented using a Destination Rule resource
Here I have configuration which breaks the circuit if 3 continuous 5XX responses are received from the Producer in a 15 second interval, and then does not make a request for another 15 seconds

With this configuration in place a request with broken circuit looks like this:

The mesh responds with a http status code of 503 and a message of "no healthy upstream"


Conclusion

The neat thing is that in all scenarios so far, the way the Caller calls the Producer remains exactly the same, it is the mesh which injects in the appropriate security controls through mTLS and the resilience of calling service through timeouts and circuit breaker. 

Wednesday, October 27, 2021

Service to Service call patterns in Google Cloud - GKE

This is a series of posts that will explore service to service call patterns in some of the application runtimes in Google Cloud. This specific post will explore GKE without using a service mesh and the next post will explore GKE with Anthos Service Mesh.


Set Up

The set-up is simple, two applications - caller and producer are hosted on the application runtime with caller making a http request to the producer. An additional UI is packaged with the caller that should make it easy to test the different scenarios.


The producer is special, a few faults can be injected into the producers response based on the post body from the caller:

  1. An arbitrary delay
  2. A specific response http status code

These will be used for checking how the runtimes behave under faulty situation.

GKE Autopilot Runtime

The fastest way to get a fully managed Kubernetes cluster in Google Cloud is to spin up a GKE Autopilot cluster. Assuming such a cluster is available, the service to service call pattern is through the abstraction of a Kubernetes service and looks something like this:

A manifest file which enables this is the following:


Once a service resource is created, here called "sample-producer" for instance, a client can call it using the services FQDN - sample-producer.default.svc.cluster.local. In my sample, the caller and the called are in the same namespace, for such cases calling by just the service name is sufficient.

A sample service to service call and its output in a simple UI looks like this:



A few things to see here:
  1. As the request flows from the browser to the caller to the producer, the headers are captured at each stage and presented. There is nothing special with the headers so far, once service meshes come into play they start to get far more interesting.
  2. The delay does not do anything, the browser and the caller end up waiting no matter how high the delay.
  3. Along the same lines, if the producer starts failing, caller continues to send requests down to the service, instead of short circuiting it.

Conclusion

Service to service call in a Kubernetes environment is straightforward with the abstraction of a Kubernetes service resource providing a simple way for clients to reach the instances hosting an application. Layering in a service mesh provides a great way for the service to service calls to be much more resilient without the application explicitly needing to add in libraries to handle request timeouts or faulty upstream services. This will be the topic of the next blog post. 

Thursday, September 30, 2021

Google Cloud Deploy - CD for a Java based project

This is a short write-up on using Google Cloud Deploy for Continuous Deployment of a Java-based project. 

Google Cloud Deploy is a new entrant to the CD space. It facilitates a continuous deployment currently to GKE based targets and in future to other Google Cloud application runtime targets.

Let's start with why such a tool is required, why not an automation tool like Cloud Build or Jenkins. In my mind it comes down to these things:

  1. State - a dedicated CD tool can keep state of the artifact, to the environments where the artifact is deployed. This way promotion of deployments, rollback to an older version, roll forward is easily done. Such an integration can be built into a CI tool but it will involve a lot of coding effort.
  2. Integration with the Deployment environment - a CD tools integrates well the target deployment platform without too much custom code needed.

Target Flow

I am targeting a flow which looks like this, any merge to a "main" branch of a repository should:
1. Test and build an image
2. Deploy the image to a "dev" GKE cluster
3. The deployment can be promoted from the "dev" to the "prod" GKE cluster


Building an Image

Running the test and building the image is handled with a combination of Cloud Build providing the build automation environment and skaffold providing tooling through Cloud Native Buildpacks. It may be easier to look at the code repository to see how both are wired up - https://github.com/bijukunjummen/hello-skaffold-gke


Deploying the image to GKE

Now that an image has been baked, the next step is to deploy this into a GKE Kubernetes environment.  Cloud Deploy has a declarative way of specifying the environments(referred to as Targets) and how to promote the deployment through the environments. A Google Cloud Deploy pipeline looks like this:


The pipeline is fairly easy to read. Target(s) describe the environments to deploy the image to and the pipeline shows how progression of the deployment across the environments is handled. 

One thing to notice is that the "prod" target has been marked with a "requires approval" flag which is a way to ensure that the promotion to prod environment happens only with an approval. Cloud Deploy documentation has a good coverage of all these concepts. Also, there is a strong dependence on skaffold to generate the kubernetes manifests and deploying them to the relevant targets.

Given such a deployment pipeline, it can be put in place using:

gcloud beta deploy apply --file=clouddeploy.yaml --region=us-west1

Alright, now that the CD pipeline is in place, a "Release" can be triggered once the testing is completed in a "main" branch, a command which looks like this is integrated with the Cloud Build pipeline to do this, with a file pointing to the build artifacts:

gcloud beta deploy releases create release-01df029 --delivery-pipeline hello-skaffold-gke --region us-west1 --build-artifacts artifacts.json
This deploys the generated kubernetes manifests pointing to the right build artifacts to the "dev" environment


and can then be promoted to additional environments, prod in this instance.

Conclusion

This is a whirlwind tour of Google Cloud Deploy and the feature that it offers. It is still early days and I am excited to see where the Product goes. The learning curve is fairly steep, it is expected that a developer understands:
  1. Kubernetes, which is the only application runtime currently supported, expect other runtimes to be supported as the Product evolves.
  2. skaffold, which is used for building, tagging, generating kubernetes artifacts
  3. Cloud Build and its yaml configuration
  4. Google Cloud Deploys yaml configuration

It will get simpler as the Product matures.

Saturday, September 25, 2021

Cloud Build and Gradle/Maven Caching

One of the pain points in all the development projects that I have worked on has been setting up/getting an infrastructure for automation. This has typically meant getting access to an instance of Jenkins. I have great respect for Jenkins as a tool, but each deployment of Jenkins tends to become a Snowflake over time with the different set of underlying plugins, version of software, variation of pipeline script etc.

This is exactly the niche that a tool like Cloud Build solves for, deployment is managed by Google Cloud platform, and the build steps are entirely user driven based on the image used for each step of the pipeline.

In the first post I went over the basics of creating a Cloud Build configuration and in the second post went over a fairly comprehensive pipeline for a Java based project.

This post will conclude the series by showing an approach to caching in the pipeline - this is far from original, I am borrowing generously from a few sample configurations that I have found. So let me start by describing the issue being solved for.


Problem

Java has two popular build tools - Gradle and Maven. Each of these tools download a bunch of dependencies and cache these dependencies at startup -
  1. The tool itself is not a binary, but a wrapper which knows to download the right version of the tools binary.
  2. The projects dependencies specified in tool specific DSL's are then downloaded from repositories.
The issue is that across multiple builds the dependencies tend to get downloaded when run

Caching across Runs of a Build

The solution is to cache the downloaded artifacts across the different runs of a build. There is unfortunately no built in way (yet) in Cloud Build to do this, however a mechanism can be built along these lines:
  1. Cache the downloaded dependencies into Cloud Storage at the end of the build 
  2. And then use it to rehydrate the dependencies at the beginning of the build, if available

A similar approach should work for any tool that downloads dependencies. The trick though is figuring out where each tool places the dependencies and knowing what to save to Cloud storage and back. 

Here is an approach for Gradle and Maven.

Each step of the cloud build loads the exact same volume:
    
	volumes:
      - name: caching.home
        path: /cachinghome

Then  explodes the cached content from cloud storage into this volume.

    dir: /cachinghome
    entrypoint: bash
    args:
      - -c
      - |
        (
          gsutil cp gs://${_GCS_CACHE_BUCKET}/gradle-cache.tar.gz /tmp/gradle-cache.tar.gz &&
          tar -xzf /tmp/gradle-cache.tar.gz
        ) || echo 'Cache not found'
    volumes:
      - name: caching.home
        path: /cachinghome

Now, Gradle and Maven store the dependencies into a ".gradle" and ".m2" folder in a users home directory respectively. The trick then is to link the $USER_HOME/.gradle and $USER_HOME/.m2 folder to the exploded directory:


  - name: openjdk:11
    id: test
    entrypoint: "/bin/bash"
    args:
      - '-c'
      - |-
        export CACHING_HOME="/cachinghome"
        USER_HOME="/root"
        GRADLE_HOME="$${USER_HOME}/.gradle"
        GRADLE_CACHE="$${CACHING_HOME}/gradle"

        mkdir -p $${GRADLE_CACHE}

        [[ -d "$${GRADLE_CACHE}" && ! -d "$${GRADLE_HOME}" ]] && ln -s "$${GRADLE_CACHE}" "$${GRADLE_HOME}"
        ./gradlew check
    volumes:
      - name: caching.home
        path: /cachinghome

The gradle tasks should now use the cached content if available or create the cached content if it is being run for the first time. 


It may be simpler to see a sample build configuration which is here - https://github.com/bijukunjummen/hello-cloud-build/blob/main/cloudbuild.yaml


Friday, August 20, 2021

Cloud Build - CI/CD for a Java Project

In a previous blog post I went over the basics of what it takes to create a configuration for Cloud Build. This post will expand on it by creating a functional CI/CD pipeline for a java project using Cloud Build. Note that I am claiming the pipeline will be functional but far from optimal, a follow up post at some point will go over potential optimizations to the pipeline.


Continuous Integration

The objective of Continuous integration is to ensure that developers regularly merge quality code into a common place. The quality is ascertained using automation, which is where a tool like Cloud Build comes in during the CI process.

Consider a flow where developers work on feature branches and when ready send a pull request to the main branch

Now to ensure quality, checks should be run on the developers feature branch before it is allowed to be merged into the "main" branch. This means two things:

1. Running quality checks on the developers feature branch
2. Merges to main branch should not be permitted until checks are run.


Let's start with Point 1 - Running quality checks on a feature branch

Running quality checks on a feature branch

This is where integration of Cloud Build with the repo comes into place. I am using this repository - https://github.com/bijukunjummen/hello-cloud-build, to demonstrate this integration with Cloud Build. If you have access to a Google Cloud environment, a new integration of Cloud build build with a repository looks something like this:



Once this integration is in place, a Cloud Build "trigger" should be created to act on a new pull request to the repository:




Here is where the Cloud Build configuration comes into play, it specifies what needs to happen when a Pull Request is made to the repository. This is a Java based project with gradle as the build tool, I want to run tests and other checks, which is normally done through a gradle task called "check", a build configuration which does this is simple:




steps:
  - name: openjdk:11
    id: test
    entrypoint: "./gradlew"
    args: [ "check" ]

Onto the next objective - Merges to the main branch should not be allowed until the checks are clean

Merges to main branch only with a clean build

This is done on the repository side on github, through settings that look like this - 

The settings protects the "main" branch by only allowing in merges after the checks in the PR branch is clean. It also prevents checking in code directly to the main branch.


With these two considerations, checking the feature branch before merges are allowed, and allowing merges to "main" branch after checks should ensure that quality code should get into the "main" branch. 

Onto the Continuous Deployment side of the house. 


Continuous Deployment

So now presumably a clean code has made its way to the main branch and we want to deploy it to an environment. 

In Cloud Build this translates to a "trigger", that acts on commits to specific branches and looks like this for me:


and again the steps expressed as a Cloud Build configuration, has steps to re-run the checks and deploy the code to Cloud Run 


steps:
  - name: openjdk:11
    id: test
    entrypoint: "/bin/bash"
    args:
      - '-c'
      - |-
        ./gradlew check

  - name: openjdk:11
    id: build-image
    entrypoint: "/bin/bash"
    args:
      - '-c'
      - |-
        ./gradlew jib --image=gcr.io/$PROJECT_ID/hello-cloud-build:$SHORT_SHA
 
  - name: 'gcr.io/cloud-builders/gcloud'
    id: deploy
    args: [ 'run', 'deploy', "--image=gcr.io/$PROJECT_ID/hello-cloud-build:$SHORT_SHA", '--platform=managed', '--project=$PROJECT_ID', '--region=us-central1', '--allow-unauthenticated', '--memory=256Mi', '--set-env-vars=SPRING_PROFILES_ACTIVE=gcp', 'hello-cloud-build' ]

Here I am using Jib to create the image.

Wrapup


With this tooling in place, a developer flow looks like this. A PR triggers checks and shows up like this on the github side:


and once checks are complete, allows the branch to be merged in:


After merge the code gets cleanly deployed.


Tuesday, August 10, 2021

Google Cloud Build - Hello World

I have been exploring Google Cloud Build recently and this post is a simple introduction to this product. You can think of it as a tool that enables automation of deployments. This post though will not go as far as automating deployments, instead just covering the basics of what it involves in getting a pipeline going. A follow up post will show a continuous deployment pipeline for a java application. 

Steps

The basic steps to set-up a Cloud Build in your GCP project is explained here. Assuming that the Cloud Build has been set-up, I will be using this github project to create a pipeline.

Cloud pipeline is typically placed as a yaml configuration in a file named by convention as "cloudbuild.yaml". The pipeline is described as a series of steps, each step runs in a docker container and the name of the step points to the docker image. So for eg. a step which echo's a message looks like this:

Here the name "bash" points to the docker image named "bash" in docker hub

The project does not need to be configured in Google Cloud Build to run it, instead a utility called "cloud-build-local" can be used for running the build file. 


git clone git@github.com:bijukunjummen/hello-cloud-build.git
cd hello-cloud-build
cloud-build-local .
Alright, now to add a few more steps. Consider a build file with 2 steps: Here the two steps will run serially, first Step A, then Step B. A sample output looks like this on my machine:

Starting Step #0 - "A"
Step #0 - "A": Already have image (with digest): bash
Step #0 - "A": Step A
Finished Step #0 - "A"
2021/08/10 12:50:23 Step Step #0 - "A" finished
Starting Step #1 - "B"
Step #1 - "B": Already have image (with digest): bash
Step #1 - "B": Step B
Finished Step #1 - "B"
2021/08/10 12:50:25 Step Step #1 - "B" finished
2021/08/10 12:50:26 status changed to "DONE"

Concurrent Steps

A little more complex, say if I wanted to execute a few steps concurrently, the way to do it is using waitFor property of a step.


Here "waitFor" value of "-" indicates the start of the build, so essentially Step A and B will run concurrently and an output in my machine looks like this:

Starting Step #1 - "B"
Starting Step #0 - "A"
Step #1 - "B": Already have image (with digest): bash
Step #0 - "A": Already have image (with digest): bash
Step #1 - "B": Step B
Step #0 - "A": Step A
Finished Step #1 - "B"
2021/08/10 12:54:21 Step Step #1 - "B" finished
Finished Step #0 - "A"
2021/08/10 12:54:21 Step Step #0 - "A" finished

One more example where Step A is executed first and then Step B and Step C concurrently:

Passing Data

A root volume at path "/workspace" carries through the build, so if a step wants to pass data to another step then it can be passed through this "/workspace" folder. Here Step A is writing to a file and Step B is reading from the same file.

Conclusion

This covers the basics of the steps in a Cloud Build configuration file. In a subsequent post I will be using these to create a pipeline to deploy a java based application to Google Cloud Run.

Friday, July 2, 2021

Kotlin "Result" type for functional exception handling

In a previous post I had gone over how a "Try" type can be created in Kotlin from scratch to handle exceptions in a functional way. There is no need however to create such a type in Kotlin, a type called "Result" already handles the behavior of "Try" and this post will go over how it works. I will be taking the scenario from my previous post, having two steps:
  1. Parsing a Url
  2. Fetching from the Url
Either of these steps can fail
  • the URL may not be well formed, and 
  • fetching from a remote url may have network issues
So onto the basics of how such a call can be made using the Result type. You can imagine that parsing URL can return this Result type, capturing any exception that may result from such a call:
fun parseUrl(url: String): Result<URL> = 
        kotlin.runCatching { URL(url) }

Kotlin provides the "runCatching" function which accepts the block that can result in an exception and traps the result OR the exception in the "Result" type. Now that a "Result" is available, some basic checks can be made on it, I can check that the call succeeded using the "isSuccess" and "isFailure" properties:
val urlResult: Result<URL> = parseUrl("http://someurl")
urlResult.isSuccess == true
urlResult.isFailure == false

I can get the value using various "get*" methods:
urlResult.getOrNull() // Returns null if the block completed with an exception
urlResult.getOrDefault(URL("http://somedefault")) // Returns a default if the block completed with an exception
urlResult.getOrThrow() // Throws an exception if the block completed with an exception

The true power of "Result" type is however in chaining operations on it. So for eg, if you wanted to retrieve the host name given the url:
val urlResult: Result<URL> = parseUrl("http://someurl")
val hostResult: Result<String> = urlResult.map { url -> url.host }

Or a variant "mapCatching" which can trap any exception when using map operation and capture that as a "Result":
val getResult: Result<String> = urlResult.mapCatching { url -> throw RuntimeException("something failed!") }

All very neat! One nit that I have with the current "Result" is a missing "flatMap" operation, so for eg. consider a case where I have these two functions:
fun parseUrl(url: String): Result<URL> =
    kotlin.runCatching { URL(url) }
    
fun getFromARemoteUrl(url: URL): Result<String> {
    return kotlin.runCatching { "a result" }
}

I would have liked to be able to chain these two operations, along these lines:
val urlResult: Result<URL> = parseUrl("http://someurl")
val getResult: Result<String> = urlResult.flatMap { url -> getFromARemoteUrl(url)}

but a operator like "flatMap" does not exist (so far, as of Kotlin 1.5.20) 

I can do today is a bit of hack:
val urlResult: Result<URL> = parseUrl("http://someurl")
val getResult: Result<String> = urlResult.mapCatching { url -> getFromARemoteUrl(url).getOrThrow() }
OR even better, create an extension function which makes flatMap available to "Result" type, this way and use it:
fun <T, R> Result<T>.flatMap(block: (T) -> (Result<R>)): Result<R> {
    return this.mapCatching {
        block(it).getOrThrow()
    }
}
val urlResult: Result<URL> = parseUrl("http://someurl")
val getResult: Result<String> = urlResult.flatMap { url -> getFromARemoteUrl(url)}
This concludes my exploration of the Result type and the ways to use it. I have found it to be a excellent type to have in my toolbelt.

Thursday, June 3, 2021

Spring Endpoint to handle Json Patch and Json Merge Patch

In a previous blog post I went over the basics of Json Patch and Json Merge Patch and how a code that performs these operations looks like. In this post I will go over the details of how to expose a Spring based endpoint to accept a Json Patch or Json Merge Patch body and patch and save an entity. The entity that I want to update is a Book, and a sample book looks like this in a json form:
{
  "title": "Goodbye!",
  "author": {
    "givenName": "John",
    "familyName": "Doe"
  },
  "tags": [
    "example",
    "sample"
  ],
  "content": "This will be unchanged"
}
A kotlin representation of this entity is the following:
data class Book(
    val title: String,
    val author: Author,
    val tags: List<String>,
    val content: String,
    val phoneNumber: String? = null
)

data class Author(
    val givenName: String,
    val familyName: String? = null
)
Let's start with an endpoint that performs a Json Patch The endpoint should accept the patch in a request body, should accept a content type of "application/json-patch+json": A sample kotlin code of such an endpoint is the following:
import com.github.fge.jsonpatch.JsonPatch
...
...
@PatchMapping(path = ["/{id}"], consumes = ["application/json-patch+json"])
fun jsonPatchBook(
    @PathVariable id: String,
    @RequestBody patch: JsonNode
): Mono<ResponseEntity<Book>> {
    return Mono.fromSupplier {
        val jsonPatch: JsonPatch = JsonPatch.fromJson(patch)
        val original: JsonNode = objectMapper.valueToTree(getBook(id))
        val patched: JsonNode = jsonPatch.apply(original)
        val patchedBook: Book =
            objectMapper.treeToValue(patched) ?: throw RuntimeException("Could not convert json back to book")
        updateBook(patchedBook)
        ResponseEntity.ok(patchedBook)
    }
}
All that is involved is to : 
  1. Take in the Json Patch body and convert it into the JsonPatch type 
  2. Retrieve the Book entity for the identifier 
  3. Convert the Book entity into a Json representation 
  4. Apply the patch and convert the resulting json back into the Book entity

For an endpoint that performs Json Merge patch, along the same lines, the endpoint should accept the json merge patch request body with a content type of "application/merge-patch+json":


@PatchMapping(path = ["/{id}"], consumes = ["application/merge-patch+json"])
fun jsonMergePatchBook(
    @PathVariable id: String,
    @RequestBody patch: JsonNode
): Mono<ResponseEntity<Book>> {
    return Mono.fromSupplier {
        val original: JsonNode = objectMapper.valueToTree(getBook(id))
        val patched: JsonNode = JsonMergePatch.fromJson(patch).apply(original)
        val patchedBook: Book =
            objectMapper.treeToValue(patched) ?: throw RuntimeException("Could not convert json back to book")
        updateBook(patchedBook)
        ResponseEntity.ok(patchedBook)
    }
}
Steps are:
  1. Take in the Json Merge Patch body
  2. Retrieve the Book entity for the identifier 
  3. Convert the Book entity into a Json representation 
  4. Apply the merge patch and convert the resulting json back into the Book entity
All fairly straightforward thanks to the easy way that Spring Web allows to expose an endpoint and the way json-patch library provides support for the Json Patch and Json Merge Patch operations. If you need a complete working example with all the dependencies pulled in, here is a sample in my github repository - https://github.com/bijukunjummen/coroutine-cities-demo/blob/main/src/test/kotlin/samples/geo/patch/BookController.kt

Saturday, May 8, 2021

Json Patch and Json Merge Patch in Java

 Json Patch and Json Merge Patch both do one job well - a way to represent a change to a source json structure.  

Json Patch does it as a series of operations which transforms a source document and Json Merge Patch represents the change as a lite version of the source document.

It is easier to show these as an example, and this is straight from the Json Merge Patch's RFC.

Let's start with a source document:

{
  "title": "Goodbye!",
  "author": {
    "givenName": "John",
    "familyName": "Doe"
  },
  "tags": [
    "example",
    "sample"
  ],
  "content": "This will be unchanged"
}
and the objective is to transform it to this document:
{
  "title": "Hello!",
  "author": {
    "givenName": "John"
  },
  "tags": [
    "example"
  ],
  "content": "This will be unchanged",
  "phoneNumber": "+01-123-456-7890"
}
Which may be easier to visualize in a diff view:


The consolidated set of changes are:
  1. The title is being changed
  2. Author/familyName is removed
  3. One of the tags is removed
  4. A phone number is added

Json Patch

This change can be represented the following way using Json Patch document:
[
  { "op": "replace", "path": "/title", "value": "Hello!"},
  { "op": "remove", "path": "/author/familyName"},
  { "op": "add", "path": "/phoneNumber", "value": "+01-123-456-7890"},
  { "op": "replace", "path": "/tags", "value": ["example"]}
]
A series of operations transforms the source document into the target document. An operation can be one of "add", "remove", "replace", "move", "copy" or "test" and in the example exactly matches the diff.

 

Json Merge Patch

A Json merge patch for the change looks like this:
{
  "title": "Hello!",
  "author": {
    "familyName": null
  },
  "phoneNumber": "+01-123-456-7890",
  "tags": [
    "example"
  ]
}   
There is a little bit of interpretation required on how the change gets applied, it is very intuitive though: 1. The presence of "title" with a new value indicates that the title needs to be changed. 2. An explicit "null" for the family name indicates that the field should be removed 3. A phoneNumber field indicates that a new field needs to be added 4. Updated tags indicates that the tags need to be modified.
 

Using Json Patch with Java

json-patch is an awesome java library that provides support for both Json Patch and Json Merge Patch. It integrates with the excellent Jackson library and provides patch tooling on top of the the library. The sample is in kotlin:
val s = """
{
    "title": "Goodbye!",
    "author": {
      "givenName": "John",
      "familyName": "Doe"
    },
    "tags": [
      "example",
      "sample"
    ],
    "content": "This will be unchanged"
}        
""".trimIndent()


val patch = """
    [
        { "op": "replace", "path": "/title", "value": "Hello!"},
        { "op": "remove", "path": "/author/familyName"},
        { "op": "add", "path": "/phoneNumber", "value": "+01-123-456-7890"},
        { "op": "replace", "path": "/tags", "value": ["example"]}
    ]
""".trimIndent()
val jsonPatch: JsonPatch = JsonPatch.fromJson(objectMapper.readTree(patch))
val target = jsonPatch.apply(objectMapper.readTree(s))

Using Json Merge Patch with Java

The library makes using Json Merge patch equally easy:
val s = """
{
    "title": "Goodbye!",
    "author": {
      "givenName": "John",
      "familyName": "Doe"
    },
    "tags": [
      "example",
      "sample"
    ],
    "content": "This will be unchanged"
}        
""".trimIndent()


val patch = """
{
    "title": "Hello!",
    "author": {
      "familyName": null
    },
    "phoneNumber": "+01-123-456-7890",
    "tags": ["example"]
}   
""".trimIndent()

val jsonMergePatch: JsonMergePatch = JsonMergePatch.fromJson(objectMapper.readTree(patch))
val target = jsonMergePatch.apply(objectMapper.readTree(s))

Conclusion

Json Patch and Json Merge Patch are ways to represent a change to a json document. Both approaches do it a little differently but both are equally intuitive.

Tuesday, March 23, 2021

Project reactor and Caching with Caffeine

So you have a function which takes a key and returns a project reactor Mono type. 

Mono<String> get(String key) {
    Random random = ThreadLocalRandom.current();
    return Mono.fromSupplier(() -> key + random.nextInt());
}
And you want to cache the retrieval of this Mono type by key, a good way to do that is to use the excellent Caffeine library. Caffeine natively does not support reactor types however, but it is fairly easy to use Caffeine with reactor the following way:
public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration,
                                                    @NotNull Function<String, Mono<T>> fn) {
    final Cache<String, T> cache = Caffeine.newBuilder()
            .expireAfterWrite(duration)
            .recordStats()
            .build();

    return key -> {
        T result = cache.getIfPresent(key);
        if (result != null) {
            return Mono.just(result);
        } else {
            return fn.apply(key).doOnNext(n -> cache.put(key, n));
        }
    };
}
It essentially wraps a function returning the Mono, and uses Caffeine to get the value from a cache defined via a closure. If value is present in the cache it is returned otherwise when the Mono emits a value, the value in the cache is set from that. So how can this be used..here is a test with this utility:
Function<String, Mono<String>> fn = (k) -> get(k);
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn);
StepVerifier.create(wrappedFn.apply("key1"))
        .assertNext(result1 -> {
            StepVerifier.create(wrappedFn.apply("key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();
            StepVerifier.create(wrappedFn.apply("key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();

            StepVerifier.create(wrappedFn.apply("key2"))
                    .assertNext(result2 -> {
                        assertThat(result2).isNotEqualTo(result1);
                    })
                    .verifyComplete();
        })
        .verifyComplete();
Here I am using Project Reactors StepVerifier utility to run a test on this wrapped function and ensure that cached value is indeed returned for repeating keys. The full sample is available in this gist

Coroutine based Spring boot webflux application

I have worked with Spring Framework for ages and it still manages to surprise me with how cutting edge it continues to be but at the same time enabling a developer to put together a fairly sane app.

The most recent surprise was with how it enables programming a web application with Kotlin coroutines. Coroutines is a fairly complicated concept to get my head around, but it is starting to click now and while trying out some samples I thought it may be a good idea to put an end to end web application in place. 

Thanks to the excellent Spring Boot starters it was not difficult at all. Along the way I also decided to experiment with r2dbc which is another involved technology to interact with a database using reactive streams. Combining reactive streams for interacting with the database but using coroutines in the rest of the layers was not difficult at all. In this post I will not be covering the nuances of what I had to do to get the sample to work, but will cover one thin slice of what it looks like. The sample is here in my github repo and should be fairly self explanatory. 


I have to acknowledge that Nicolas Frankel's blog post provided me a lot of pointers in getting the working code just right


A Slice of functionality

The slice of functionality that I will consider in this post is to return a list of entities and an entity from an embedded database that I have used for the application. 

Let's start from bottom up. So at the lowest level I have to query the database and return a list of entities, this is made dirt simple using the Spring Data based repositories. This is the entirety of the repository code that returns coroutine types.


import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import samples.geo.domain.City

interface CityRepo : CoroutineCrudRepository<City, Long>
Just by doing this the CRUD operations now become suspendable functions. So to return a list of an entity or a specific entity, the signature looks something like this:


fun getCities(): Flow<City> {
    return cityRepo.findAll()
}

suspend fun getCity(id: Long): City? {
    return cityRepo.findById(id)
}
Any list operations now return the Coroutine Flow type and getting an entity is a suspendable function.


Moving to the web layer(I have a service layer, but it is just a passthrough to the repo in this instance), I like to have an handler for handling the Webflux ServerRequest and ServerResponse types the following way:
suspend fun getCities(request: ServerRequest): ServerResponse {
    val cities = cityService.getCities()
        .toList()
    return ServerResponse.ok().bodyValueAndAwait(cities)
}

suspend fun getCity(request: ServerRequest): ServerResponse {
    val id = request.pathVariable("id").toLong()
    val city = cityService.getCity(id)

    return city
        ?.let { ServerResponse.ok().bodyValueAndAwait(it) }
        ?: ServerResponse.notFound().buildAndAwait()
}
which is then composed at the web layer the following way:
object AppRoutes {
    fun routes(cityHandler: CityHandler): RouterFunction<*> = coRouter {
        accept(MediaType.APPLICATION_JSON).nest {
            GET("/cities", cityHandler::getCities)
            GET("/cities/{id}", cityHandler::getCity)
            ...
        }
    }
}
The "coRouter" dsl above provides the functionality to convert the Kotlin coroutine types to the Spring WebFlux RouterFunction type This is essentially it. The code and tests for all this fairly sophisticated set of technology(r2dbc, coroutines, webflux, reactive streams etc) that this encompasses is fairly small as can be seen from the github repository

Conclusion

Getting a working end to end web application with Coroutines and Spring Webflux is just a "Spring" board to further exploration of Coroutines for me and I hope to gain deeper insights into this fascinating technology over time. Having been involved in the Spring community for so long, it is fascinating to note that it continues to remain one of the best frameworks to develop applications in, mainly because of the constant innovation and its focus on developer happiness.

Sunday, March 14, 2021

Deriving a Kotlin "Try" type

Functional programming languages like Scala often have a type called "Try" to hold the result of a computation if successful or to capture an exception on failure. 

This is an incredibly useful type, allowing a caller to pointedly control how to handle an exceptional scenario. In this post I will try and create such a type from scratch.

As an example, I will be using the scenario from Daniel Westheide's excellent introduction to the Try type in Scala

So my objective is to call a remote URL and return the content as a string. A few things can go wrong - 

  • The url can be badly formed
  • The url may be wrong and may not have anything to retrieve content from

Let's start with the first one, the URL being badly formed, an API call using the "Try" type would look something like this:

fun parseUrl(url: String): Try<URL> {
    return Try.of {
        URL(url)
    }
}
Here a URL is being parsed and the result is a valid url or an exception. So a Try type that can implement this much, would look something like this:
sealed class Try<out T> {

    class Success<T>(private val result: T) : Try<T>()

    class Failure<T>(private val throwable: Throwable) : Try<T>()


    companion object {
        fun <T> of(block: () -> T) = try {
            Success(block())
        } catch (e: Throwable) {
            Failure(e)
        }
    }
}

"Try" type has two sub types - A "Success" wrapping a successful result and a "Failure" capturing an exception from the call. 

With the two subtypes in place, let's extend the use of the Try type:

val urlResult: Try<URL> = parseUrl("htt://somewrongurl")
assertThat(urlResult.isFailure()).isTrue()
assertThat(urlResult.isSuccess()).isFalse()

If I were to call using a badly formatted url like above with a wrong scheme "htt" instead of "http", should result in a failure. So let's implement the "isFailure" and "isSuccess" behavior:

sealed class Try<out T> {

    abstract fun isSuccess(): Boolean
    fun isFailure(): Boolean = !isSuccess()

    class Success<T>(private val result: T) : Try<T>() {
        override fun isSuccess(): Boolean = true
    }

    class Failure<T>(private val throwable: Throwable) : Try<T>() {
        override fun isSuccess(): Boolean = false
    }

    ...
}
That works nicely, so now that a url is available, hopefully valid, lets get some content from the URL:


val uriResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = getFromARemoteUrl(uriResult.get())
assertThat(getResult.get()).isEqualTo("a result")
which means that our "Try" type should have a "get()" method to retrieve the result if successful and can be implemented like this:


sealed class Try<out T> {
    ...
    abstract fun get(): T

    class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun get(): T = result
    }

    class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun get(): T = throw throwable
    }
}
The Success path simply returns the result and the Failure path propagates the wrapped exception.

map Operation

Let's take it a small step forward. Given a url say you want to return the host of the url
val uriResult: Try<URL> = parseUrl("http://myhost")
assertThat(uriResult.get().host).isEqualTo("myhost")
While this works, the problem with the approach is that the "get()" call for an invalid url would result in an exception if the url is not valid to start with, so a better approach is to retrieve the host name only if the url is valid. Traditionally this is done using a "map" operator and a usage looks like this:
val urlResult: Try<URL> = parseUrl("http://myhost")
val hostResult: Try<String> = urlResult.map { url -> url.host }
assertThat(hostResult).isEqualTo(Try.success("myhost"))
So let's add in a "map" operator to the "Try" type:
sealed class Try<out T> {
    ...
    abstract fun <R> map(block: (T) -> R): Try<R>

    abstract fun get(): T

    data class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun <R> map(block: (T) -> R): Try<R> {
            return of {
                block(result)
            }
        }
    }

    data class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun <R> map(block: (T) -> R): Try<R> {
            return this as Failure<R>
        }
    }
}
and it behaves as expected.

flatMap Operation

Along the lines of "map" operation, now lets get back to the original scenario of validating the url and then attempting to get the content. Now the call to get content can also fail, so you would want that to be wrapped with a Try type also. 

val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = getFromARemoteUrl(urlResult.get())

The two call needs to be chained together, and "map" operation may appear to be the right operator to use:
 
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<Try<String>> = urlResult.map { url -> getFromARemoteUrl(url) }

If you look at the response type now, it does not really line up, it is a "Try<Try<String>>" and not a "Try<String>", this is exactly what a flatMap operation does. It takes a valid URL and returns just the inner wrapped result. 
A test using it would look like this:
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = urlResult.flatMap { url -> getFromARemoteUrl(url) }
assertThat(getResult).isEqualTo(Try.success("a result"))
So how can "flatMap" be implemented, with a fairly simple code that looks like this:
sealed class Try<out T> {
    ...
    abstract fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R>

    data class Success<T>(private val result: T) : Try<T>() {
        ...
        override fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R> {
            return try {
                tryBlock(result)
            } catch (e: Throwable) {
                failure(e)
            }
        }
    }

    data class Failure<T>(private val throwable: Throwable) : Try<T>() {
        ...
        override fun <R> flatMap(tryBlock: (T) -> Try<R>): Try<R> {
            return this as Failure<R>
        }
    }
}
One more small feature, given that Try type has two subtypes is to deconstruct the contents when required:
val urlResult: Try<URL> = parseUrl("http://someurl")
val getResult: Try<String> = urlResult.flatMap { url -> getFromARemoteUrl(url) }
when (getResult) {
    is Try.Success -> {
        val (s) = getResult
        println("Got a clean result: $s")
    }
    is Try.Failure -> {
        val (e) = getResult
        println("An exception: $e")
    }
}

This assumes that the user knows the subtypes which may be an okay assumption to make for this type. 


Conclusion

A type like "Try" is incredibly useful in capturing a result cleanly or with exception and provides a neat alternative to using a normal try..catch block. Here I showed a way to write such a type from scratch, however this may be an overkill, a better way to get such a type is to simply use an excellent library like vavr which has the Try type already built in. I feel it is instructive to create such a type from scratch though.

Tuesday, March 9, 2021

Jackson Kotlin extension and reified types

Jackson Kotlin module library is a pleasure to use. It greatly simplifies gnarly code, specifically one's involving TypeReference

Consider a sample json which looks like this:
{
    "a" : ["b", "c"],
    "b" : ["a", "c"],
    "c" : ["a", "b"]
}
This content can be represented as a "Map<List<String>>" type in Java. 

So now, if I were to use straight Java to convert the string to the appropriate type, the code would look like this:
Map<String, List<String>> result = objectMapper.readValue(json, new TypeReference<>() {
});
What exactly is that "TypeReference" doing there... think of it as a way of making the type parameters of the generic type "Map" which are "String" and "List" available at runtime, without that the types are erased at runtime and Java would not know that it has to create a "Map<String, List<String>>".

Kotlin can hide this detail behind an extension function which if defined from scratch would look like this:
inline fun <reified T> ObjectMapper.readValue(src: String): T = readValue(src, object : TypeReference<T>() {})
and a code using such an extension function:
val result: Map<String, List<String>> = objectMapper.readValue(json)
See how all the TypeReference related code is well hidden in the extension function. 

This is the kind of capability that is provided by the Jackson Kotlin Module With the right packages imported a sample code with this module looks like this:
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
val objectMapper = jacksonObjectMapper()
val result: Map<String, List<String>> = objectMapper.readValue(json)
This is just one of the simplifications offered by the use of the module. It also supports features like Kotlin Data classes, Kotlin built-in types like Pair, Triple etc out of the box.

Highly recommended if you are using Jackson with Kotlin.

Sunday, February 28, 2021

Top "n" using a Priority Queue

If you ever need to capture the smallest or largest "n" from a stream of data, the approach more often than not will be to use a simple data-structure called the Priority Queue. 

Priority Queues do one thing very well - once a bunch of data is added, it can return the lowest value (or the highest value) in constant time. 

How is this useful to answer a top or bottom "n" type question. Let's see. 

Consider this hypothetical stream of data:




And you have to answer the smallest 2 at any point as this stream of data comes in. The trick is to use a Priority Queue 

  • with a size limit of 2
  • which returns the largest of these 2 when asked for (sometimes referred to as a Max Priority Queue)

Two considerations as data streams in:
  • if the size of the priority queue is less than 2 then add the value to the priority queue
  • if the size of the priority queue is equal to 2, then compare the value from the stream with the largest in the queue
    • if less then remove the largest and add the new value
    • if more then do nothing

At the bottom is the state of the Priority Queue as each data in the stream is processed:



See how it always holds the bottom 2. 

Similarly for the largest 3, a Priority Queue with a max capacity of 3 which returns the smallest (referred to as a Min Priority Queue) can be used the following way:
  • if the size of the priority queue is less than 3, then add to the priority queue
  • if the size is equal to 2, then check the value from the stream with the smallest in the queue
    • if more then remove smallest add the value from stream and ignore otherwise



Implementation

Here is a simple kotlin based implementation that uses the built in PriorityQueue in Java standard library.


fun findNSmallestAndLargest(nums: List<Int>, n: Int): Pair<List<Int>, List<Int>> {
    val minFirst: Comparator<Int> = Comparator.naturalOrder<Int>()
    val maxFirst: Comparator<Int> = minFirst.reversed()
    val minPq: PriorityQueue<Int> = PriorityQueue(minFirst)
    val maxPq: PriorityQueue<Int> = PriorityQueue(maxFirst)
    for (num in nums) {
        checkAndAddIfSmallest(maxPq, n, num)
        checkAndAddIfLargest(minPq, n, num)
    }
    return maxPq.toList() to minPq.toList()
}

private fun checkAndAddIfSmallest(maxPq: PriorityQueue<Int>, n: Int, num: Int) {
    if (maxPq.size < n) {
        maxPq.add(n)
    } else if (num < maxPq.peek()) {
        maxPq.poll()
        maxPq.add(num)
    }
}

private fun checkAndAddIfLargest(minPq: PriorityQueue<Int>, n: Int, num: Int) {
    if (minPq.size < n) {
        minPq.add(n)
    } else if (num > minPq.peek()) {
        minPq.poll()
        minPq.add(num)
    }
}

The implementation is very straightforward and follows the outlined algorithm to the letter.

Monday, January 18, 2021

Generating a stream of Fibonacci numbers

A Java stream represents potentially an infinite sequence of data. This is a simple post that will go into the mechanics involved in generating a simple stream of Fibonacci numbers.


The simplest way to get this stream of data is to use the generate method of Stream.

As you can imagine to generate a specific Fibonacci number in this sequence, the previous two numbers are required, which means the state of the previous two numbers need to be maintained somewhere. The two solutions that I will be describing here both maintain this state, however they do it differently.



Mutable State

In the first approach, I am just going to maintain state this way:


class FibState {
    private long[] prev = new long[2];
    private int index = 0;

    public long nextFib() {
        long result = (index == 0) ? 1
                : ((index == 1) ? 1 : prev[0] + prev[1]);
        prev[0] = prev[1];
        prev[1] = result;
        index++;
        return result;
    }
}


with index keeping track of the index of the current fibonacci number in the sequence and prev capturing the most recent two in the sequence. So the next in the series is generated by mutating the index and the changing the array holding the recent values. So given this state, how do we generate the stream, using code which looks like this:

   
Stream<Long> streamOfFib() {
    FibState fibState = new FibState();
    return Stream.generate(() -> fibState.nextFib());
}

This is using a closure to capture the fibState and mutating it repeatedly as the stream of numbers is generated. The approach works well, though the thought of mutating one value probably should induce a level of dread - is it thread safe (probably not), will it work for parallel streams(likely not), but should suffice for cases where the access is strictly sequential. A far better approach is to get a version of state that is immutable.

Immutable State


class FibState {
    private final long[] prev;
    private final int index;

    public FibState() {
        this(new long[]{-1, 1}, 0);
    }

    public FibState(long[] prev, int index) {
        this.prev = prev;
        this.index = index;
    }

    public FibState nextFib() {
        int nextIndex = index + 1;
        long result = (nextIndex == 1) ? 1 : prev[0] + prev[1];
        return new FibState(new long[]{prev[1], result}, nextIndex);
    }

    public long getValue() {
        return prev[1];
    }
}
Instead of mutating the state, it returns the next immutable state. Alright, so now that this version of state is available, how can it be used - by using the "iterate" function of Stream, like this:
Stream<Long> streamOfFib() {
    return Stream
            .iterate(new FibState(), fibState -> fibState.nextFib())
            .map(fibState -> fibState.getValue());
}

this function takes two parameters - the initial state and something which can generate the next state. Also to return numbers from it, I am mapping this "state" type to a number in the "map" operation.


Conclusion

This is generally instructive on how to generate a stream using Fibonacci sequence as an example. The approach will be fairly similar for any stream that you may need to generate. My personal preference is for the Immutable state variation as that delegates most of the mechanism of generating the stream to the excellent "iterate" helper function of Stream.