Using Apache Kafka in modern event-driven applications is pretty popular. For a better cloud-native experience with Apache Kafka, it's highly recommended to check out Red Hat AMQ Streams, which offers an easy installation and management of an Apache Kafka cluster on Red Hat OpenShift.
This article shows how the Kafka-CDI library can handle difficult setup tasks and make creating Kafka-powered event-driven applications for MicroProfile and Jakarta EE very easy.
Vanilla Apache Kafka Consumers
While the concepts of Kafka producers and consumers are simple, writing the actual code can be quiet cumbersome and requires some boilerplate code such as the following:
final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); final KafkaConsumer<String, String> consumer = new KafkaConsumer(props); ... consumer.subscribe(Collections.singleton("topic")); ... final ConsumerRecords<String, String> records = consumer.poll(500); for (final ConsumerRecord<String, String<> record : records) { logger.info(record.value()); }
The above code shows a simple configuration of a KafkaConsumer
, which leaves the developer with a few tasks such as manually defining the actual type of the key and the value of the consumed record. Besides that, the consumers are also not thread-safe and the developer needs to take care of that using Java concurrency APIs.
CDI Extensions to the Rescue
In various enterprise Java communities, such as Eclipse MicroProfile or Jakarta EE, CDI is the natural choice for managing dependencies and their configuration. The API also allows developers to create extensions that can leverage the entire "Java EE" lifecycle and its powerful platform APIs as well.
The Kafka-CDI library from the AeroGear project is such an extension, and it makes creating Kafka-powered applications for MicroProfile or Jakarta EE very easy.
Kafka-CDI in Action
The setup is simple and requires just a few lines of Maven coordinates:
<dependency> <groupId>org.aerogear.kafka</groupId> <artifactId>kafka-cdi-extension</artifactId> <version>0.1.0</version> </dependency>
Consumers with CDI
Creating CDI-managed beans that act as message consumers is now quite easy and requires only a small bit of code:
@KafkaConfig(bootstrapServers ="#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}") public class MyAwesomeConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyAwesomeConsumer.class); @Consumer(topics = "topic", groupId = "demo-group") public void onMessage(final String key, final String value) { LOGGER.info("We got this value: " + value); } }
With a single @Consumer
annotation, the bean is turned into a message consumer. The Kafka-CDI extension handles all the configuration, such as the type deserialization for the key and value of the Kafka record, as well as the threading behind the scenes. For each application, one KafkaConfig
annotation is needed to identify the list of available bootstrap servers. The values for topic
, groupId
, and bootstrapServers
, as in this example, can be resolved using environment variables or system properties.
Producers with CDI
Writing producers is easy as well:
@Producer private SimpleKafkaProducer<String, String> myproducer; ... myproducer.send("topic", myKey, myValue);
Any bean can be injected with a convenient producer (SimpleKafkaProducer
) that can be used in any method for sending messages to the Kafka cluster. Similar to the consumer example, the CDI extension handles the type serialization for the key and value.
KStreams with CDI
Last but not least, the library has some initial support for working with the Kafka Streams API:
@KafkaStream(input = "input.topic", output = "output.topic") public KTable<String, Long> successfullMessagesPerJobTransformer(KStream<String, String> source) { return successCountsPerJob = source.filter((key, value) -> value.equals("Success")) .groupByKey() .count("successCounter"); }
When a method is annotated with @KafkaStream
, the library processes the passed KStream
object by executing the annotated method. The input
and output
attributes from the @KafkaStream
annotation tell the library which topic to read from and which topic the stream processing job should be written to. The type serialization and deserialization are again handled by the framework for the convenience of the developer.
Supported Types
The library supports various ways for JSON serialization and deserialization. It comes with Serializer
and Deserializer
for the javax.json.Json
type, and it has a fallback mechanism to automatically use the Jackson library for any unknown type. Support for Apache Avro is not yet implemented but that is on the roadmap of features.
Conclusion
This article was an introduction to the Kafka-CDI library in action. Only a few lines of code are required to get started with the library. The focus is on user simplicity, while all the difficult setup tasks are handled by the CDI extension.
Using the library, it's a no-brainer to write event-driven applications using MicroProfile implementations such Thorntail.
Thanks to Gunnar Morling for valuable input!
Last updated: February 22, 2024