In the previous blog post, we defined more than one business logic in a single integration.

There are multiple drawbacks to this type of approach :
- It makes the process synchronous, the last part of the Integration is sending CSV files to S3 one by one
- The Integration yaml definition is heavy
- It makes harder to re-use integrations for the same business logic
Our current integration is also highlighting 2 areas :
- On one side, we have a piece of business logic that knows where the original file is, which format it has and how to split the rows into individual messages
- On the other side we have a piece of business logic that is just expecting a message with a body containing the id of the row and any content and knows how to write bytes to S3 as individual json files
We probably would like to decouple these two pieces of logic like this

Welcome Knative Eventing !
Knative Eventing can be described as an Event Mesh and is articulated around 4 principles :
- Knative Eventing resources are loosely coupled
- Event producers and consumers are independent ( producers can send events before consumers exist, and vice versa)
- Other services can be connected to the Eventing system
- Ensure cross-service inter-operability by being consistent with the CloudEvents specification
There are multiple constructs that form Knative Eventing, such as broker, channel, subscriptions, triggers, sources and sinks.
The following image tries to summarise most of them but I highly suggest you take some time reading the Knative Eventing docs if you want to know more ( and I will likely blog again about Knative soon).

If you want to have Knative running on your local Minikube, I suggest you follow the instructions in my getting-started with camel-k and knative repository.
Camel K on Knative
Camel-K has full support for Knative, both Eventing and Serving. As we saw in the previous blog post, when an integration is deployed the Camel-K operator takes care about a lot of tasks and the more of them will be taken care of in case Knative is available on your Kubernetes cluster.
Let’s see it in action and decouple our integration. The overall flow will look like the following :

Let’s now extract first our integration business logic that writes json body bytes to an S3 bucket
apiVersion: camel.apache.org/v1 kind: Integration metadata: name: kn-s3-processor spec: dependencies: - "camel:jackson" configuration: - type: configmap value: rss-to-s3-configmap - type: secret value: aws flows: - from: uri: "knative:event/csv" steps: - unmarshal: json: {} - set-header: name: CamelAwsS3Key simple: "articles/converted/${body[id]}.json" - marshal: json: {} - log: "${header.CamelAwsS3Key}" - to: uri: aws-s3://{{BUCKET}} parameters: accessKey: "{{ACCESS_KEY}}" secretKey: "{{SECRET_KEY}}" region: "{{REGION}}"
And let’s start the integration :
kubectl apply -f s3-processor.yml
We did not covered what Knative Service
are in the Knative diagram above as it would make this blog post too long. You just need to know that if knative serving is available, camel-l will deploy integrations as knative service, which allows scale down-to-zero of integrations as well as http based autoscaling.
Let’s see if the integration pod is running
kubectl get po .... kn-s3-processor-00001-deployment-6d5668f944-b5w5k 2/2 Running 0 48s
And if you wait a couple of seconds more the integration is teared down to 0 pods as no messages are coming and Knative decides to scale the service down to 0. Isn’t it just amazing ?
Some people call this Serverless ! So, Camel-K and Knative offer you Serverless Enterprise Integrations !!!!
Ok, let’s go back to business. And let’s start the source integration, reading the huge CSV file from S3.
apiVersion: camel.apache.org/v1 kind: Integration metadata: name: kn-csv-reader spec: dependencies: - "camel:jackson" - "camel:csv" configuration: - type: configmap value: rss-to-s3-configmap - type: secret value: aws flows: - from: uri: aws-s3://hume-datasets parameters: prefix: articles accessKey: "{{ACCESS_KEY}}" secretKey: "{{SECRET_KEY}}" region: "{{REGION}}" deleteAfterRead: false steps: - unmarshal: csv: use-maps: true allow-missing-column-names: true - split: simple: "${body}" streaming: true - log: "${body}" - remove-headers: pattern: "*" - marshal: json: {} - to: "knative:event/csv"
kubectl apply -f s3-csv-source.yml
Inspect the pods
kubectl get po kn-csv-reader-5479c4cd6c-tdk4q 1/1 Running 0 5s
And once the CSV file will be downloaded from S3, it will start to produce events to the csv
channel and the s3-processor
integration pods will wake up
kn-csv-reader-5479c4cd6c-tdk4q 1/1 Running 0 79s kn-s3-processor-00001-deployment-6d5668f944-2lbpf 2/2 Running 0 40s kn-s3-processor-00001-deployment-6d5668f944-2qlr6 2/2 Running 0 43s kn-s3-processor-00001-deployment-6d5668f944-cdxmf 2/2 Running 0 38s kn-s3-processor-00001-deployment-6d5668f944-gn724 2/2 Running 0 34s kn-s3-processor-00001-deployment-6d5668f944-pdmkn 2/2 Running 0 30s kn-s3-processor-00001-deployment-6d5668f944-rhfvt 2/2 Running 0 36s
Summary
- Knative offers common needs people have when running on Kubernetes, such as Eventing and Serving, we could cover only of a small portion of it in this blog post
- Camel-K offers native support for Knative
- Decoupling your Integrations with an Event Mesh offers a lot of benefits, such as autoscaling of consumers
You can find the code for the integrations on my camel-k-integrations-examples repository.