Data Streaming with WildFly Swarm and Apache Kafka

At the beginning of October, I attended JavaOne in San Francisco to present on WildFly Swarm and Apache Kafka. For those of you that weren't able to attend the session, or for those that did and saw first hand the issues with the demo, I will be covering all the details of how the demo should work!

The presentation material that was presented at JavaOne can be found here, and all the code for the demos is in GitHub.

MiniShift Setup

To get started with the demo we need to install MiniShift. We also need the oc binary on our path, which MiniShift provides.

Once installed we want to start it with a bit more than the minimum:

minishift start --cpus 3 --memory 4GB

Once MiniShift has started and OpenShift is running, open up the console. You can either create a new project or use the default one created for you.

Apache Kafka Setup

The first task is to set up Apache Kafka. Lucky for us the EnMasse project has a handy OpenShift template we can use. Select Add to Project and then Import YAML/JSON. Paste in the raw text from the OpenShift template, and click Create. It will ask what you want to do, select to Process the template.

Head over to a terminal window and run:

oc login

entering developer and password as the credentials.

Once logged into OpenShift from the terminal run:

oc get services

This provides all the details of running services within OpenShift. If you don't see anything the first time, go back and check the console to see if the Kafka and Zookeeper pods have started ok.

The important service we need is the zookeeper. Its cluster IP address is required when creating a Kafka Topic.

Kafka Topic

To see all the running pods in the OpenShift console select Applications and then Pods in the UI. Select a Kafka instance and select Terminal. A terminal window for the instance should now be visible in the web browser. We can create a topic with the following command:

./bin/kafka-topics.sh --create --topic fruit_topic --replication-factor 2 --partitions 3 --zookeeper 172.30.123.92:2181

The zookeeper URL in the command above is the one we saw from oc get services.

We've now successfully configured Kafka for data streaming, now we need some services to interact with it.

WildFly Swarm

We won't cover all aspects of the services we're creating as they're detailed on GitHub. We will focus on the integration with Kafka.

Let's start with a simple RESTFul service to store fruit names in a database. FruitResource is a simple JAX-RS Resource class that provides the RESTFul endpoints for GET, POST, and PUT. Each method interacts with the data within the database only.

To make it more interesting, we want to send an Event to Kafka. In Kafka, an Event is a combination of key, value, and timestamp. Each Event is persisted and cannot be altered.

We will use a CDI extension from Aerogear to help us integrate with Kafka. It's fairly new but is actively being enhanced. First, we need to make it available to our project with the following Maven dependency:

<dependency>
  <groupId>net.wessendorf.kafka</groupId>
  <artifactId>kafka-cdi-extension</artifactId>
  <version>0.0.11</version>
</dependency>

Produce an Event

For our JAX-RS Resource to be able to send an event to Kafka, we need to provide some configuration:

@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")

Here we use environment variables in OpenShift to find where Kafka is located. With this approach, the configuration is super easy.

With Kafka configured, we now need to access a Producer to send an event to:

@Producer
private SimpleKafkaProducer<Integer, Fruit> producer;

Since we're dealing with Fruit instances, we want to send an event that has a key of Integer and a value that is a Fruit instance. As a result, we can subsequently send an event on creating like:

producer.send("fruit_topic", fruit.getId(), fruit);

A point of note is that the topic name we use when calling send() must match the topic name we created in Kafka earlier.

Finally lets run our service in MiniShift! Navigate to /rest-data and run:

mvn clean fabric8:deploy -Popenshift

Consequently, we can access the OpenShift Console and open the route that was created for our service. On the web page, we will see a list of fruit that we can add or update the name.

Consume an Event

First, we use the same method to define configuration for our consumer as we did with our producer.

Furthermore, we then need a way to consume events that we receive from Kafka:

@Consumer(topics = "fruit_topic", keyType = Integer.class, groupId = "fruit_processor")
public void processFruit(final Integer key, final Fruit fruitData) {
  logger.error("We received: " + fruitData);
}

The key points in our use of @Consumer are that we define the same topic name as our producer, so we can receive the correct events, and that we provide a unique consumer group for Kafka.

Finally lets run our service in MiniShift! Navigate to /log-consumer and run:

mvn clean fabric8:deploy -Popenshift

All we're doing is logging out the details of the Fruit instance we received, which can be viewed from within the logs of the log-consumer service within the OpenShift console.

In addition, you will notice that you only see messages in the log if you make changes to rest-data once log-consumer is running. That's because our consumer defaults to only reading messages that occur after its initialization.

It's also possible to request a replay of every event on the topic that exists by adding offset = "earliest" onto the @Consumer annotation.

Conclusion

You've just experienced a whirlwind tour of integrating Apache Kafka into your WildFly Swarm microservices!

I hope it's opened your ideas about what can be achieved with integrating services and event-driven systems.

Please provide feedback to the CDI extension for Kafka, providing ideas and suggestions on how to improve it.


To build your Java EE Microservice visit WildFly Swarm and download the cheat sheet.

Last updated: October 5, 2023