I recently encountered the need to aggregate CSV rows based on a cell containing an entity id. This is what the flow would look like when using a single instance of Camel :
Now let’s see how it would flow when running integrations with Camel-K on Kubernetes :
We can immediately spot the issue that since the scaled integrations do not communicate with each other, they have their own view of an aggregation which is the default behaviour when using an InMemory aggregation repository.
To solve this problem we need to have a persistent AggregationRepository that is distributed in the Kubernetes cluster as well.
Hazelcast to the rescue
Hazelcast IMDG is a highly scalable in-memory data grid offering distributed usages of maps, collections, … and Camel supports Hazelcast as backend for AggregationRepository interfaces.
To do so, you need to install Hazelcast on your Kubernetes cluster (instructions provided in the repository at the end of the blog post), configure the HzInstance in your integration and register it to the registry and then reference it in the aggregationRepositoryRef of your EIP.
The Hazelcast client can leverage the information from the Kubernetes cluster itself and all you have to do is to provide the namespace name as well as the Hazelcast service name.
This is what your integration route would look like
In the previous blog post, we defined more than one business logic in a single integration.
There are multiple drawbacks to this type of approach :
It makes the process synchronous, the last part of the Integration is sending CSV files to S3 one by one
The Integration yaml definition is heavy
It makes harder to re-use integrations for the same business logic
Our current integration is also highlighting 2 areas :
On one side, we have a piece of business logic that knows where the original file is, which format it has and how to split the rows into individual messages
On the other side we have a piece of business logic that is just expecting a message with a body containing the id of the row and any content and knows how to write bytes to S3 as individual json files
We probably would like to decouple these two pieces of logic like this
Welcome Knative Eventing !
Knative Eventing can be described as an Event Mesh and is articulated around 4 principles :
Event producers and consumers are independent ( producers can send events before consumers exist, and vice versa)
Other services can be connected to the Eventing system
Ensure cross-service inter-operability by being consistent with the CloudEvents specification
There are multiple constructs that form Knative Eventing, such as broker, channel, subscriptions, triggers, sources and sinks.
The following image tries to summarise most of them but I highly suggest you take some time reading the Knative Eventing docs if you want to know more ( and I will likely blog again about Knative soon).
Camel-K has full support for Knative, both Eventing and Serving. As we saw in the previous blog post, when an integration is deployed the Camel-K operator takes care about a lot of tasks and the more of them will be taken care of in case Knative is available on your Kubernetes cluster.
Let’s see it in action and decouple our integration. The overall flow will look like the following :
Let’s now extract first our integration business logic that writes json body bytes to an S3 bucket
We did not covered what Knative Service are in the Knative diagram above as it would make this blog post too long. You just need to know that if knative serving is available, camel-l will deploy integrations as knative service, which allows scale down-to-zero of integrations as well as http based autoscaling.
Let’s see if the integration pod is running
kubectl get po
....
kn-s3-processor-00001-deployment-6d5668f944-b5w5k 2/2 Running 0 48s
And if you wait a couple of seconds more the integration is teared down to 0 pods as no messages are coming and Knative decides to scale the service down to 0. Isn’t it just amazing ?
Some people call this Serverless ! So, Camel-K and Knative offer you Serverless Enterprise Integrations !!!!
Ok, let’s go back to business. And let’s start the source integration, reading the huge CSV file from S3.
kubectl get po
kn-csv-reader-5479c4cd6c-tdk4q 1/1 Running 0 5s
And once the CSV file will be downloaded from S3, it will start to produce events to the csv channel and the s3-processor integration pods will wake up
Knative offers common needs people have when running on Kubernetes, such as Eventing and Serving, we could cover only of a small portion of it in this blog post
Camel-K offers native support for Knative
Decoupling your Integrations with an Event Mesh offers a lot of benefits, such as autoscaling of consumers
Apache Camel is Java Framework which simplifies the life of developers when it comes to integration ( be it data or systems ).
It is a fairly mature and robust project, heavily based on the Enterprise Integration Patterns and with a beautiful DSL. It also has a very wide (300+) list of ready connectors.
The following code snippet shows that moving files from Dropbox to your local filesystem really comes to writing a few lines of integration DSL :
Developers are still left with packaging, configuring, deploying, running and monitoring their enterprise integration applications, which can be some big chunk of work as well when you run on Kubernetes.
Welcome Apache Camel K
From the Camel K website, Camel K is a lightweight integration framework built from Apache Camel and specifically designed for serverless and microservices architectures.
It leverages recent advances in the Java ecosystem, such as Quarkus and the ease of running Java on Kubernetes nowadays.
The promise is that developers focus on writing only the integration and Camel K will take care of the rest. As a matter of fact, the following snippet saved as a Groovy file
from('timer:tick?period=3000')
.setBody().constant('Hello world from Camel K')
.to('log:info')
is the only thing you need to write. Running kamel run myfile.groovy will package and deploy your integration on the configured Kubernetes cluster.
No surprise of course that since Apache Camel is maintained by RedHat, all the Camel K integrations run smoothly on an Openshift Cluster.
A Concrete Example
I wanted to use Camel K for a tiny but realistic example than just printing Hello on the console. I faced myself with a CSV file containing 100k articles from the news and realised the format wasn’t really fit for a friendly, performant and reusable usage of this dataset.
So, how hard could it be to write this with a Camel K integration ?
Installing Camel K on minikube
I’ll not go over the whole minikube installation, with Knative and Istio, you can really just follow the tutorial here ( which contains the Camel K installation as well ), but really installing Camel K comes down to :
Download the Camel K binary
Run kamel install
Creating your first Integration
Integrations can be written with the DSL in one of the following languages :
Groovy
Java
Javascript
Kotlin
XML
YAML
You can also choose to deploy integrations by creating Integration definitions in YAML, like you would write Pod or Service definitions.
I’ve chosen to go for the latter.
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: s3-to-s3
spec:
dependencies:
- "camel:camel-csv"
configuration:
- type: configmap
value: rss-to-s3-configmap
- type: secret
value: aws
flows:
- from:
uri: aws-s3://{{BUCKET}}
parameters:
prefix: articles
accessKey: "{{ACCESS_KEY}}"
secretKey: "{{SECRET_KEY}}"
region: "{{REGION}}"
deleteAfterRead: false
steps:
- unmarshal:
csv:
use-maps: true
allow-missing-column-names: true
- split:
simple: "${body}"
streaming: true
# Remove the headers on the exchange, otherwise it will try to write on the same bucket but also the original content length and target content length are different
- remove-headers:
pattern: "*"
- set-header:
name: CamelAwsS3Key
simple: "articles/converted/${body[id]}.json"
- marshal:
json: {}
- to:
uri: aws-s3://{{BUCKET}}
parameters:
accessKey: "{{ACCESS_KEY}}"
secretKey: "{{SECRET_KEY}}"
region: "{{REGION}}"
Let’s analyse the flows part of the definition first. The integration will, step by step :
Read files from the given bucket
transform each row into a key/value map where keys are the column headers
split on each row and stream
remove the message headers in order to avoid having the source headers into the destination headers
define the filename it will have on the s3 bucket, be it the id column value and a json extension
transform as json
write to a new destination on s3
Places where environment variables are used are leveraged using ConfigMaps and Secrets.
The only thing that is left is deploying the integration :
And this is the output of the kamel log s3-to-s3 along with the logs of the camel-k operator
How does this really works ?
In a nutshell, when you install Camel K, it actually install a Kubernetes operator that will reconcile the state of the integrations for any event related to Camel Integrations.
It will :
Build a Camel Quarkus application with the provided flow as configuration
Generate a container image from it and push it to the cluster registry
Create a Deployment for the integration
Apply the Deployment
Crazy cool, isn’t it ?
Summary
The Apache Camel team has been pushing a lot on bringing Camel forward onto modern infrastructure and the Camel K project, while still in its inception, is very promising with for example a native integration with Knative Eventing and Messaging.
The reality is that the project needs still to mature, most of the examples and documentation you will find focus on the developer experience while I’m looking more on production ready and CI/CD based deployments. It’s also unclear how to test integrations.
There is however one feature of Camel K that will be dedicated in a further blog post : Kamelets. Kamelets are Route templates, directly available in integrations and I think it will be a killer feature for organisations that want to give a spin at data mesh architectural paradigms. Stay tuned 😉
If you’re interested in discovering more about Apache Camel K, I suggest you follow the awesome-camel-k repository updates.
Spring Cloud Function is a project which helps developers bring the power of Spring Boot features (auto-configure, metrics, ..) on Serverless platforms, but not only.
More importantly, it abstracts all the details concerning the infrastructure, the transport and the provider.
The following function :
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
can be exposed as an http endpoint, a message consumer or a task processor and be run locally or on any of the serverless platform providers like AWS or even as a Knative service which will take care of scaling down your function app to zero in case of inactivity.
High Level Concepts
In a nutshell, the Spring Cloud Function project will detect and wrap your beans of type Function, Consumer or Supplier and expose as http endpoint and/or message stream listeners/publishers with the most common message brokers such as RabbitMQ or Kafka.
Getting Started
The best way to get started is to initiate a project on start.spring.io . We will expose our function to the web layer, so you will need to select the following dependencies :
Function
Spring Web
Click on generate, download the zipfile and extract it.
The content will be a standard spring boot application and you can immediately start writing your function as a bean and start the spring boot application.
Amend the main Application class and add the following bean :
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
Launch the application :
mvn spring-boot:run
In another terminal window, you can make an http request passing a string to be uppercased by the function :
It will automatically be mapped under the /consume endpoint. Let’s try it out :
echo 'hello' | http localhost:8080/consume
2021-01-16 21:31:10.845 INFO 48020 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-01-16 21:31:10.846 INFO 48020 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
hello
Type Conversion
There are default Message Converters that can transform the input of the transport layer to the type specified as input to the function, for example let’s take the following function :
@Bean
public Function<Person, String> fullName() {
return person -> String.format("%s %s", person.getFirstName(), person.getLastName());
}
Where the Person class is :
package com.cwillemsen.cloudnative.springcloudfunction;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Person {
private String firstName;
private String lastName;
}
The following http request will return the desired output from the function :
We’ve seen the basic concepts of the Spring Cloud Function project and how it enables developers to only focus on writing the business logic by implementing it as Functions.
In further blog posts, we’ll see how to route, compose and deploy functions.
You can find the source code used in this post here.