Decoupling your Camel K Enterprise Integrations with the Knative Event Mesh

In the previous blog post, we defined more than one business logic in a single integration.

There are multiple drawbacks to this type of approach :

  • It makes the process synchronous, the last part of the Integration is sending CSV files to S3 one by one
  • The Integration yaml definition is heavy
  • It makes harder to re-use integrations for the same business logic

Our current integration is also highlighting 2 areas :

  • On one side, we have a piece of business logic that knows where the original file is, which format it has and how to split the rows into individual messages
  • On the other side we have a piece of business logic that is just expecting a message with a body containing the id of the row and any content and knows how to write bytes to S3 as individual json files

We probably would like to decouple these two pieces of logic like this


Welcome Knative Eventing !

Knative Eventing can be described as an Event Mesh and is articulated around 4 principles :

  • Knative Eventing resources are loosely coupled
  • Event producers and consumers are independent ( producers can send events before consumers exist, and vice versa)
  • Other services can be connected to the Eventing system
  • Ensure cross-service inter-operability by being consistent with the CloudEvents specification

There are multiple constructs that form Knative Eventing, such as broker, channel, subscriptions, triggers, sources and sinks.

The following image tries to summarise most of them but I highly suggest you take some time reading the Knative Eventing docs if you want to know more ( and I will likely blog again about Knative soon).

If you want to have Knative running on your local Minikube, I suggest you follow the instructions in my getting-started with camel-k and knative repository.


Camel K on Knative

Camel-K has full support for Knative, both Eventing and Serving. As we saw in the previous blog post, when an integration is deployed the Camel-K operator takes care about a lot of tasks and the more of them will be taken care of in case Knative is available on your Kubernetes cluster.

Let’s see it in action and decouple our integration. The overall flow will look like the following :

Let’s now extract first our integration business logic that writes json body bytes to an S3 bucket

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kn-s3-processor
spec:
  dependencies:
    - "camel:jackson"
  configuration:
    - type: configmap
      value: rss-to-s3-configmap
    - type: secret
      value: aws
  flows:
  - from:
      uri: "knative:event/csv"
      steps:
        - unmarshal:
            json: {}
        - set-header:
            name: CamelAwsS3Key
            simple: "articles/converted/${body[id]}.json"
        - marshal:
            json: {}
        - log: "${header.CamelAwsS3Key}"
        - to:
            uri: aws-s3://{{BUCKET}}
            parameters:
                accessKey: "{{ACCESS_KEY}}"
                secretKey: "{{SECRET_KEY}}"
                region: "{{REGION}}"

And let’s start the integration :

kubectl apply -f s3-processor.yml

We did not covered what Knative Service are in the Knative diagram above as it would make this blog post too long. You just need to know that if knative serving is available, camel-l will deploy integrations as knative service, which allows scale down-to-zero of integrations as well as http based autoscaling.

Let’s see if the integration pod is running

kubectl get po

....
kn-s3-processor-00001-deployment-6d5668f944-b5w5k   2/2     Running   0          48s

And if you wait a couple of seconds more the integration is teared down to 0 pods as no messages are coming and Knative decides to scale the service down to 0. Isn’t it just amazing ?

Some people call this Serverless ! So, Camel-K and Knative offer you Serverless Enterprise Integrations !!!!

Ok, let’s go back to business. And let’s start the source integration, reading the huge CSV file from S3.

apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: kn-csv-reader
spec:
  dependencies:
    - "camel:jackson"
    - "camel:csv"
  configuration:
    - type: configmap
      value: rss-to-s3-configmap
    - type: secret
      value: aws
  flows:
   - from:
      uri: aws-s3://hume-datasets
      parameters:
        prefix: articles
        accessKey: "{{ACCESS_KEY}}"
        secretKey: "{{SECRET_KEY}}"
        region: "{{REGION}}"
        deleteAfterRead: false
      steps:
        - unmarshal:
            csv:
              use-maps: true
              allow-missing-column-names: true
        - split:
            simple: "${body}"
            streaming: true
        - log: "${body}"
        - remove-headers:
            pattern: "*"
        - marshal:
            json: {}
        - to: "knative:event/csv"
kubectl apply -f s3-csv-source.yml

Inspect the pods

kubectl get po

kn-csv-reader-5479c4cd6c-tdk4q      1/1     Running       0          5s

And once the CSV file will be downloaded from S3, it will start to produce events to the csv channel and the s3-processor integration pods will wake up

kn-csv-reader-5479c4cd6c-tdk4q                      1/1     Running   0          79s
kn-s3-processor-00001-deployment-6d5668f944-2lbpf   2/2     Running   0          40s
kn-s3-processor-00001-deployment-6d5668f944-2qlr6   2/2     Running   0          43s
kn-s3-processor-00001-deployment-6d5668f944-cdxmf   2/2     Running   0          38s
kn-s3-processor-00001-deployment-6d5668f944-gn724   2/2     Running   0          34s
kn-s3-processor-00001-deployment-6d5668f944-pdmkn   2/2     Running   0          30s
kn-s3-processor-00001-deployment-6d5668f944-rhfvt   2/2     Running   0          36s

Summary

  • Knative offers common needs people have when running on Kubernetes, such as Eventing and Serving, we could cover only of a small portion of it in this blog post
  • Camel-K offers native support for Knative
  • Decoupling your Integrations with an Event Mesh offers a lot of benefits, such as autoscaling of consumers

You can find the code for the integrations on my camel-k-integrations-examples repository.