Distributed Aggregate EIP on Kubernetes with Camel-K and Hazelcast In-Memory Data Grid

I recently encountered the need to aggregate CSV rows based on a cell containing an entity id. This is what the flow would look like when using a single instance of Camel :


Now let’s see how it would flow when running integrations with Camel-K on Kubernetes :

We can immediately spot the issue that since the scaled integrations do not communicate with each other, they have their own view of an aggregation which is the default behaviour when using an InMemory aggregation repository.

To solve this problem we need to have a persistent AggregationRepository that is distributed in the Kubernetes cluster as well.

Hazelcast to the rescue

Hazelcast IMDG is a highly scalable in-memory data grid offering distributed usages of maps, collections, … and Camel supports Hazelcast as backend for AggregationRepository interfaces.

To do so, you need to install Hazelcast on your Kubernetes cluster (instructions provided in the repository at the end of the blog post), configure the HzInstance in your integration and register it to the registry and then reference it in the aggregationRepositoryRef of your EIP.

The Hazelcast client can leverage the information from the Kubernetes cluster itself and all you have to do is to provide the namespace name as well as the Hazelcast service name.

This is what your integration route would look like

@BindToRegistry("hzr")
    public HazelcastAggregationRepository hazelcastRepository() {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().getKubernetesConfig().setEnabled(true)
                .setProperty("namespace", "default")
                .setProperty("service-name", "my-release-hazelcast");

        HazelcastInstance hazelcastInstance = HazelcastClient.newHazelcastClient(clientConfig);

        return new HazelcastAggregationRepository(
            "kamel-demo", hazelcastInstance
        );
    }

    @Override
    public void configure() throws Exception {

        from("knative:event/sourcecsv")
            .log("${body}")
            .unmarshal().json()
            .aggregate(new ArrayListAggregationStrategy())
                .jsonpath("$.entity_id").completionTimeout(1500).aggregationRepositoryRef("hzr")
        .log("${body}");

    }

And the new representation of the flow :

You can find all the necessary code and step in my Camel K Integrations Examples repository .