I recently encountered the need to aggregate CSV rows based on a cell containing an entity id. This is what the flow would look like when using a single instance of Camel :
Now let’s see how it would flow when running integrations with Camel-K on Kubernetes :
We can immediately spot the issue that since the scaled integrations do not communicate with each other, they have their own view of an aggregation which is the default behaviour when using an InMemory aggregation repository.
To solve this problem we need to have a persistent AggregationRepository that is distributed in the Kubernetes cluster as well.
Hazelcast to the rescue
Hazelcast IMDG is a highly scalable in-memory data grid offering distributed usages of maps, collections, … and Camel supports Hazelcast as backend for AggregationRepository interfaces.
To do so, you need to install Hazelcast on your Kubernetes cluster (instructions provided in the repository at the end of the blog post), configure the HzInstance in your integration and register it to the registry and then reference it in the aggregationRepositoryRef of your EIP.
The Hazelcast client can leverage the information from the Kubernetes cluster itself and all you have to do is to provide the namespace name as well as the Hazelcast service name.
This is what your integration route would look like
In the previous blog post, we defined more than one business logic in a single integration.
There are multiple drawbacks to this type of approach :
It makes the process synchronous, the last part of the Integration is sending CSV files to S3 one by one
The Integration yaml definition is heavy
It makes harder to re-use integrations for the same business logic
Our current integration is also highlighting 2 areas :
On one side, we have a piece of business logic that knows where the original file is, which format it has and how to split the rows into individual messages
On the other side we have a piece of business logic that is just expecting a message with a body containing the id of the row and any content and knows how to write bytes to S3 as individual json files
We probably would like to decouple these two pieces of logic like this
Welcome Knative Eventing !
Knative Eventing can be described as an Event Mesh and is articulated around 4 principles :
Event producers and consumers are independent ( producers can send events before consumers exist, and vice versa)
Other services can be connected to the Eventing system
Ensure cross-service inter-operability by being consistent with the CloudEvents specification
There are multiple constructs that form Knative Eventing, such as broker, channel, subscriptions, triggers, sources and sinks.
The following image tries to summarise most of them but I highly suggest you take some time reading the Knative Eventing docs if you want to know more ( and I will likely blog again about Knative soon).
Camel-K has full support for Knative, both Eventing and Serving. As we saw in the previous blog post, when an integration is deployed the Camel-K operator takes care about a lot of tasks and the more of them will be taken care of in case Knative is available on your Kubernetes cluster.
Let’s see it in action and decouple our integration. The overall flow will look like the following :
Let’s now extract first our integration business logic that writes json body bytes to an S3 bucket
We did not covered what Knative Service are in the Knative diagram above as it would make this blog post too long. You just need to know that if knative serving is available, camel-l will deploy integrations as knative service, which allows scale down-to-zero of integrations as well as http based autoscaling.
Let’s see if the integration pod is running
kubectl get po
kn-s3-processor-00001-deployment-6d5668f944-b5w5k 2/2 Running 0 48s
And if you wait a couple of seconds more the integration is teared down to 0 pods as no messages are coming and Knative decides to scale the service down to 0. Isn’t it just amazing ?
Some people call this Serverless ! So, Camel-K and Knative offer you Serverless Enterprise Integrations !!!!
Ok, let’s go back to business. And let’s start the source integration, reading the huge CSV file from S3.