Distributed Aggregate EIP on Kubernetes with Camel-K and Hazelcast In-Memory Data Grid

I recently encountered the need to aggregate CSV rows based on a cell containing an entity id. This is what the flow would look like when using a single instance of Camel :


Now let’s see how it would flow when running integrations with Camel-K on Kubernetes :

We can immediately spot the issue that since the scaled integrations do not communicate with each other, they have their own view of an aggregation which is the default behaviour when using an InMemory aggregation repository.

To solve this problem we need to have a persistent AggregationRepository that is distributed in the Kubernetes cluster as well.

Hazelcast to the rescue

Hazelcast IMDG is a highly scalable in-memory data grid offering distributed usages of maps, collections, … and Camel supports Hazelcast as backend for AggregationRepository interfaces.

To do so, you need to install Hazelcast on your Kubernetes cluster (instructions provided in the repository at the end of the blog post), configure the HzInstance in your integration and register it to the registry and then reference it in the aggregationRepositoryRef of your EIP.

The Hazelcast client can leverage the information from the Kubernetes cluster itself and all you have to do is to provide the namespace name as well as the Hazelcast service name.

This is what your integration route would look like

@BindToRegistry("hzr")
    public HazelcastAggregationRepository hazelcastRepository() {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().getKubernetesConfig().setEnabled(true)
                .setProperty("namespace", "default")
                .setProperty("service-name", "my-release-hazelcast");

        HazelcastInstance hazelcastInstance = HazelcastClient.newHazelcastClient(clientConfig);

        return new HazelcastAggregationRepository(
            "kamel-demo", hazelcastInstance
        );
    }

    @Override
    public void configure() throws Exception {

        from("knative:event/sourcecsv")
            .log("${body}")
            .unmarshal().json()
            .aggregate(new ArrayListAggregationStrategy())
                .jsonpath("$.entity_id").completionTimeout(1500).aggregationRepositoryRef("hzr")
        .log("${body}");

    }

And the new representation of the flow :

You can find all the necessary code and step in my Camel K Integrations Examples repository .

Decoupling your Camel K Enterprise Integrations with the Knative Event Mesh

In the previous blog post, we defined more than one business logic in a single integration.

There are multiple drawbacks to this type of approach :

  • It makes the process synchronous, the last part of the Integration is sending CSV files to S3 one by one
  • The Integration yaml definition is heavy
  • It makes harder to re-use integrations for the same business logic

Our current integration is also highlighting 2 areas :

  • On one side, we have a piece of business logic that knows where the original file is, which format it has and how to split the rows into individual messages
  • On the other side we have a piece of business logic that is just expecting a message with a body containing the id of the row and any content and knows how to write bytes to S3 as individual json files

We probably would like to decouple these two pieces of logic like this


Welcome Knative Eventing !

Knative Eventing can be described as an Event Mesh and is articulated around 4 principles :

  • Knative Eventing resources are loosely coupled
  • Event producers and consumers are independent ( producers can send events before consumers exist, and vice versa)
  • Other services can be connected to the Eventing system
  • Ensure cross-service inter-operability by being consistent with the CloudEvents specification

There are multiple constructs that form Knative Eventing, such as broker, channel, subscriptions, triggers, sources and sinks.

The following image tries to summarise most of them but I highly suggest you take some time reading the Knative Eventing docs if you want to know more ( and I will likely blog again about Knative soon).

If you want to have Knative running on your local Minikube, I suggest you follow the instructions in my getting-started with camel-k and knative repository.


Camel K on Knative

Camel-K has full support for Knative, both Eventing and Serving. As we saw in the previous blog post, when an integration is deployed the Camel-K operator takes care about a lot of tasks and the more of them will be taken care of in case Knative is available on your Kubernetes cluster.

Let’s see it in action and decouple our integration. The overall flow will look like the following :

Let’s now extract first our integration business logic that writes json body bytes to an S3 bucket

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kn-s3-processor
spec:
  dependencies:
    - "camel:jackson"
  configuration:
    - type: configmap
      value: rss-to-s3-configmap
    - type: secret
      value: aws
  flows:
  - from:
      uri: "knative:event/csv"
      steps:
        - unmarshal:
            json: {}
        - set-header:
            name: CamelAwsS3Key
            simple: "articles/converted/${body[id]}.json"
        - marshal:
            json: {}
        - log: "${header.CamelAwsS3Key}"
        - to:
            uri: aws-s3://{{BUCKET}}
            parameters:
                accessKey: "{{ACCESS_KEY}}"
                secretKey: "{{SECRET_KEY}}"
                region: "{{REGION}}"

And let’s start the integration :

kubectl apply -f s3-processor.yml

We did not covered what Knative Service are in the Knative diagram above as it would make this blog post too long. You just need to know that if knative serving is available, camel-l will deploy integrations as knative service, which allows scale down-to-zero of integrations as well as http based autoscaling.

Let’s see if the integration pod is running

kubectl get po

....
kn-s3-processor-00001-deployment-6d5668f944-b5w5k   2/2     Running   0          48s

And if you wait a couple of seconds more the integration is teared down to 0 pods as no messages are coming and Knative decides to scale the service down to 0. Isn’t it just amazing ?

Some people call this Serverless ! So, Camel-K and Knative offer you Serverless Enterprise Integrations !!!!

Ok, let’s go back to business. And let’s start the source integration, reading the huge CSV file from S3.

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kn-csv-reader
spec:
  dependencies:
    - "camel:jackson"
    - "camel:csv"
  configuration:
    - type: configmap
      value: rss-to-s3-configmap
    - type: secret
      value: aws
  flows:
   - from:
      uri: aws-s3://hume-datasets
      parameters:
        prefix: articles
        accessKey: "{{ACCESS_KEY}}"
        secretKey: "{{SECRET_KEY}}"
        region: "{{REGION}}"
        deleteAfterRead: false
      steps:
        - unmarshal:
            csv:
              use-maps: true
              allow-missing-column-names: true
        - split:
            simple: "${body}"
            streaming: true
        - log: "${body}"
        - remove-headers:
            pattern: "*"
        - marshal:
            json: {}
        - to: "knative:event/csv"
kubectl apply -f s3-csv-source.yml

Inspect the pods

kubectl get po

kn-csv-reader-5479c4cd6c-tdk4q      1/1     Running       0          5s

And once the CSV file will be downloaded from S3, it will start to produce events to the csv channel and the s3-processor integration pods will wake up

kn-csv-reader-5479c4cd6c-tdk4q                      1/1     Running   0          79s
kn-s3-processor-00001-deployment-6d5668f944-2lbpf   2/2     Running   0          40s
kn-s3-processor-00001-deployment-6d5668f944-2qlr6   2/2     Running   0          43s
kn-s3-processor-00001-deployment-6d5668f944-cdxmf   2/2     Running   0          38s
kn-s3-processor-00001-deployment-6d5668f944-gn724   2/2     Running   0          34s
kn-s3-processor-00001-deployment-6d5668f944-pdmkn   2/2     Running   0          30s
kn-s3-processor-00001-deployment-6d5668f944-rhfvt   2/2     Running   0          36s

Summary

  • Knative offers common needs people have when running on Kubernetes, such as Eventing and Serving, we could cover only of a small portion of it in this blog post
  • Camel-K offers native support for Knative
  • Decoupling your Integrations with an Event Mesh offers a lot of benefits, such as autoscaling of consumers

You can find the code for the integrations on my camel-k-integrations-examples repository.