Featured image: Integrating Spring Boot with Apicurio Service Registry

Most of the new cloud-native applications and microservices designs are based on event-driven architecture (EDA), responding to real-time information by sending and receiving information about individual events. This kind of architecture relies on asynchronous, non-blocking communication between event producers and consumers through an event streaming backbone such as Red Hat AMQ Streams running on top of Red Hat OpenShift. In scenarios where many different events are being managed, defining a governance model where each event is defined as an API is critical. That way, producers and consumers can produce and consume checked and validated events. We can use a service registry as a datastore for events defined as APIs.

From my field experience working with many clients, I've found the most typical architecture consists of the following components:

In this article, you will learn how to easily integrate your Spring Boot applications with Red Hat Integration Service Registry, which is based on the open source Apicurio Registry.

Red Hat Integration Service Registry

Service Registry is a datastore for sharing standard event schemas and API designs across APIs and event-driven architectures. Service Registry decouples the structure of your data from your client applications so that you can share and manage your data types and API descriptions at runtime. It also reduces costs by decreasing the overall message size, and it creates efficiencies by increasing the consistent reuse of schemas and API designs across your organization.

Some of the most common use cases for Service Registry are:

  • Client applications that dynamically push or pull the latest schema updates to or from the service registry at runtime without redeploying.
  • Developer teams that query the registry for existing schemas required for services already deployed in production.
  • Developer teams that register new schemas required for new services in development or rolling to production.
  • Stored schemas used to serialize and deserialize messages. Client applications can reference the stored schemas to ensure the messages they send and receive are compatible with the schemas.

Service Registry provides the following main features:

  • Support for multiple payload formats for standard event schemas and API specifications.
  • Pluggable storage options, including AMQ Streams, an embedded Infinispan in-memory data grid, and PostgreSQL database.
  • Registry content management using a web console, REST API commands, Maven plug-ins, or a Java client.
  • Rules for content validation and version compatibility to govern how registry content evolves.
  • Support for the Apache Kafka schema registry, including Kafka Connect integration for external systems.
  • A client serializer/deserializer (SerDes) to validate Kafka and other message types at runtime.
  • A cloud-native Quarkus Java runtime for low memory footprint and fast deployment times.
  • Compatibility with existing Confluent schema registry client applications.
  • Operator-based installation on OpenShift.

Introducing Service Registry for client applications

The typical workflow for introducing a new service registry to our architecture is to:

  1. Declare the event schema using common data formats like Apache Avro, a JSON schema, Google Protocol Buffers, OpenAPI, AsyncAPI, GraphQL, Kafka Connect schemas, WSDL, or XML schemas (XSD).
  2. Register the schema as an artifact in the service registry using the Service Registry UI, REST API, Maven plug-in, or a Java client. Client applications can then use the schema to validate that messages conform to the correct data structure at runtime.
  3. Use Kafka producer applications and serializers to encode messages that conform to a specific event schema.
  4. Use Kafka consumer applications and deserializers to validate that messages have been serialized using the correct schema based on a specific schema ID.

This workflow ensures consistent schema use and helps to prevent data errors at runtime.

The next sections discuss this workflow at a high level using a Spring Boot application. See the article's GitHub repository for the complete sample application source code.

Register an Avro schema in Service Registry

Avro provides a JSON schema specification to declare a variety of data structures. This simple example defines a message event:

{
  "name": "Message",
  "namespace": "com.rmarting.kafka.schema.avro",
  "type": "record",
  "doc": "Schema for a Message.",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "doc": "Message timestamp."
    },
    {
      "name": "content",
      "type": "string",
      "doc": "Message content."
    }
  ]
}

Avro also provides a Maven plug-in to autogenerate Java classes based on the provided schema definitions (.avsc files).

Once we have a schema, we can publish it in Service Registry. Publishing a schema in the registry makes it ready for client applications to use at runtime. The Apicurio Registry Maven plug-in makes it easy to publish our schema to Service Registry. We add a simple definition in our pom.xml file:

<plugin>
   <groupId>io.apicurio</groupId>
   <artifactId>apicurio-registry-maven-plugin</artifactId>
   <version>${apicurio.version}</version>
   <executions>
      <execution>
         <phase>generate-sources</phase>
         <goals>
            <goal>register</goal>
         </goals>
         <configuration>
            <registryUrl>${apicurio.registry.url}</registryUrl>
            <artifactType>AVRO</artifactType>
            <artifacts>
               <!-- Schema definition for TopicIdStrategy strategy -->
               <messages-value>${project.basedir}/src/main/resources/schemas/message.avsc</messages-value>
            </artifacts>
         </configuration>
      </execution>
   </executions>
</plugin>

Note: With the Apicurio Registry Maven plug-in, we could use the Maven build lifecycle to define or extend our application lifecycle management and CI/CD pipelines. For example, we could extend the lifecycle to publish or update schemas whenever a new version was released. Explaining how to do it is not the objective of this article, but it's something you could explore.

As soon as we publish our schema to Service Registry, we can manage it from the Service Registry UI shown in Figure 1.

Manage your new schema in the Service Registry UI
Figure 1: Manage your new schema in the Service Registry UI.

Integrate Spring Boot, Apache Kafka, and AMQ Streams

Spring Boot provides the Spring Kafka project to produce and consume messages to and from Apache Kafka. Using it is straightforward once we add the following dependency in our pom.xml file:

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

Adding the following property to your application.properties file connects your application with the AMQ Streams cluster:

spring.kafka.bootstrap-servers = ${kafka.bootstrap-servers}

Service Registry provides Kafka client serializers/deserializers for Kafka producer and consumer applications. Include the following dependency to add them to your application:

<dependency>
   <groupId>io.apicurio</groupId>
   <artifactId>apicurio-registry-utils-serde</artifactId>
   <version>${apicurio.version}</version>
</dependency>

Produce messages from Spring Boot

Spring Kafka provides a set of properties and beans to declare Kafka producers to send messages (Avro schema instances, in this case) to Apache Kafka. The two most important properties are spring.kafka.producer.key-serializer, which identifies the serializer class to serialize the Kafka record's key, and spring.kafka.producer.value-serializer, which identifies the serializer class to serialize the Kafka record's value.

We have to add specific values in these properties so that the serialization process using Avro schemas can be registered in Service Registry:

  • The serializer class to use Avro schemas provided by the Apicurio SerDe class: io.apicurio.registry.utils.serde.AvroKafkaSerializer.
  • The Apicurio service registry endpoint to validate schemas: apicurio.registry.url.
  • The Apicurio service strategy to look up the schema definition: apicurio.registry.artifact-id.

Here's a sample configuration for a producer template:

# Spring Kafka Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
spring.kafka.producer.properties.apicurio.registry.url = ${apicurio.registry.url}
spring.kafka.producer.properties.apicurio.registry.artifact-id = io.apicurio.registry.utils.serde.strategy.TopicIdStrategy

We can use the following properties to declare a KafkaTemplate to send messages (based on our Message schema):

@Bean
public ProducerFactory<String, Message> producerFactory(KafkaProperties kafkaProperties) {
    Map<String, Object> configProps = kafkaProperties.buildProducerProperties();

    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> kafkaTemplate(KafkaProperties kafkaProperties) {
    return new KafkaTemplate<>(producerFactory(kafkaProperties));
}

Finally, we can send messages (storing the artifact ID from Service Registry) to Apache Kafka:

@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;

SendResult<String, Message> record = kafkaTemplate.send(topicName, message).get();

The message will be serialized, adding the global ID associated with the schema used for this record. It will be important for the Kafka consumer applications to consume the global ID later.

Consume messages from Spring Boot

Spring Kafka also provides properties and beans to declare Kafka consumers to consume messages (Avro schema instances) from the Apache Kafka cluster. The most important properties are spring.kafka.consumer.key-deserializer, which identifies the deserializer class to deserialize the Kafka record's key, and spring.kafka.consumer.value-deserializer, which identifies the deserializer class to deserialize the Kafka record's value.

Once again, we have to add specific values—in this case, to allow the deserialization process using Avro schemas registered in Service Registry:

  • The deserializer class to use Avro schemas, which is provided by the Apicurio SerDe class: io.apicurio.registry.utils.serde.AvroKafkaDeserializer.
  • Th Apicurio service registry endpoint to get valid schemas: apicurio.registry.url.

Here's a sample configuration for a consumer template:

# Spring Kafka Consumer
spring.kafka.listener.ack-mode = manual
spring.kafka.consumer.group-id = spring-kafka-clients-sb-sample-group
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = io.apicurio.registry.utils.serde.AvroKafkaDeserializer
spring.kafka.consumer.properties.apicurio.registry.url = ${apicurio.registry.url}
# Use Specific Avro classes instead of the GenericRecord class definition
spring.kafka.consumer.properties.apicurio.registry.use-specific-avro-reader = true

We can declare a KafkaListener to consume messages (based on our Message schema) as:

@KafkaListener(topics = {"messages"})
public void handleMessages(@Payload Message message,
                           @Headers Map<String, Object> headers,
                           Acknowledgment acknowledgment) {
    LOGGER.info("Received record from Topic-Partition '{}-{}' with Offset '{}' -> Key: '{}' - Value '{}'",
            headers.get(KafkaHeaders.RECEIVED_TOPIC), headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
            headers.get(KafkaHeaders.OFFSET), headers.get(KafkaHeaders.MESSAGE_KEY),
            message.get("content"));

    // Commit message
    acknowledgment.acknowledge();
}

The schema is retrieved by the deserializer, which uses the global ID written into the message being consumed. With that, we're done!

Summary

In this article, you've seen how to integrate Spring Boot applications with Red Hat Integration Service Registry and AMQ Streams to build your event-driven architecture. Using these components together gives you the following benefits:

  • Consistent schema use between client applications.
  • Help with preventing data errors at runtime.
  • A defined governance model in your data schemas (such as for versions, rules, and validations).
  • Easy integration with Java client applications and components.

See the Red Hat Integration homepage and Service Registry documentation for more about these components. Also, see First look at the new Apicurio Registry UI and Operator for more about using Service Registry in any Kubernetes or OpenShift cluster.

Last updated: February 11, 2024