Quarkus is a Kubernetes-native Java stack tailored for GraalVM and OpenJDK HotSpot, crafted from the best of breed Java libraries and standards, according to the project website. Starting with the 0.17.0 release, Quarkus supports using the Advanced Message Queuing Protocol (AMQP), which is an open standard for passing business messages between applications or organizations.
Red Hat AMQ Online is a Red Hat OpenShift-based mechanism for delivering messaging as a managed service. Previously, we have seen how to use AMQ Online to provision messaging. In this article, we will combine AMQ Online and Quarkus to show how you can create a modern messaging setup on OpenShift using two new technologies from the messaging space.
The guide assumes you have an installation of AMQ Online on OpenShift. Read the installation guide for more information. AMQ Online is based on the EnMasse open source project.
We will start off with creating a Quarkus application using reactive messaging, a simple order processing system. It includes an order generator, that sends orders to a messaging queue at fixed intervals, an order processor that processes orders from a messaging queue and delivers confirmations to be viewed in an HTML page.
Once the application is created, we will show how to inject messaging configuration into the application and use AMQ Online to provision the messaging resources that we need.
Quarkus application
Our Quarkus application will run on OpenShift and is a modified version of the amqp-quickstart. The full example client can be found here.
Order generator
The order generator sends monotonically increasing order identifiers to an "orders" address every 5 seconds.
@ApplicationScoped
public class OrderGenerator {
private int orderId = 1;
@Outgoing("orders")
public Flowable<Integer> generate() {
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> orderId++);
}
}
Order processor
The order processor is even simpler; it just returns a confirmation id to the "confirmations" address.
@ApplicationScoped
public class OrderProcessor {
@Incoming("orders")
@Outgoing("confirmations")
public Integer process(Integer order) {
// Confirmation id is twice the order id :)
return order * 2;
}
}
Confirmations resources
The confirmations resource is an HTTP endpoint for listing the confirmations that have been produced.
@Path("/confirmations")
public class ConfirmationResource {
@Inject
@Stream("confirmations") Publisher<Integer> orders;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<Integer> stream() {
return orders;
}
}
Configuration
Our application needs some configuration information in order to connect to AMQ Online. The Quarkus connector configuration, AMQP endpoint information, and client credentials all need to be supplied. Although it is good practice to keep configuration in one place, we will split things up a bit to show the options you have for configuring a Quarkus application.
Connectors
The connector configuration can be supplied at compile time using the application properties file:
mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp
To keep things simple, we will only use a messaging queue for the "orders" address. The "confirmations" address will use an in-memory queue in the example application.
AMQP endpoint
The AMQP endpoint hostname and port information is not known at compile time and must be injected. The endpoint information can be provided in a configmap created by AMQ Online; thus, we will set them as environment variables in the application manifest:
spec:
template:
spec:
containers:
- env:
- name: AMQP_HOST
valueFrom:
configMapKeyRef:
name: quarkus-config
key: service.host
- name: AMQP_PORT
valueFrom:
configMapKeyRef:
name: quarkus-config
key: service.port.amqp
Credentials
We want to use the ability to use service account tokens for authenticating our messaging application on OpenShift. To do this, we need to create a custom ConfigSource that reads the authentication token from the pod filesystem:
public class MessagingCredentialsConfigSource implements ConfigSource {
private static final Set<String> propertyNames;
static {
propertyNames = new HashSet<>();
propertyNames.add("amqp-username");
propertyNames.add("amqp-password");
}
@Override
public Set<String> getPropertyNames() {
return propertyNames;
}
@Override
public Map<String, String> getProperties() {
try {
Map<String, String> properties = new HashMap<>();
properties.put("amqp-username", "@@serviceaccount@@");
properties.put("amqp-password", readTokenFromFile());
return properties;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public String getValue(String key) {
if ("amqp-username".equals(key)) {
return "@@serviceaccount@@";
}
if ("amqp-password".equals(key)) {
try {
return readTokenFromFile();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return null;
}
@Override
public String getName() {
return "messaging-credentials-config";
}
private static String readTokenFromFile() throws IOException {
return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
}
}
Building and deploying the application
To deploy the example application, you need GraalVM to perform native compilation of the application. Follow the steps in the Quarkus Guide for setting up your environment.
Then, follow these instructions to download the source, build, and deploy the example application:
git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply
The application will be deployed but will not start until we configure AMQ Online with the messaging resources we need.
Configuring messaging
The remaining part is to configure messaging resources needed by our application. We need to create an address space to provision a messaging endpoint, an address to configure our messaging address, and a messaging user to configure client credentials.
Address space
An AMQ Online AddressSpace is a group of addresses that share connection endpoints as well as authentication and authorization policies. When creating an AddressSpace, you can configure how your messaging endpoints are exposed:
apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
name: quarkus-example
spec:
type: brokered
plan: brokered-single-broker
endpoints:
- name: messaging
service: messaging
exports:
- name: quarkus-config
kind: configmap
Address
Messages are sent and received from an address. An address has a type that determines its semantics and a plan that determines the amount of resources reserved for this address. An address can be defined like this:
apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
name: quarkus-example.orders
spec:
address: orders
type: queue
plan: brokered-queue
Messaging user
To ensure that only trusted applications are able to send and receive messages to your addresses, a messaging user must be created. For applications running on-cluster, you can authenticate clients using an OpenShift service account. A "serviceaccount" user can be defined like this:
apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
name: quarkus-example.app
spec:
username: system:serviceaccount:myapp:default
authentication:
type: serviceaccount
authorization:
- operations: ["send", "recv"]
addresses: ["orders"]
Application configuration permissions
To allow AMQ Online to create the configmap used to inject AMQP endpoint information, we also need to define a Role and RoleBinding:
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: quarkus-config
spec:
rules:
- apiGroups: [ "" ]
resources: [ "configmaps" ]
verbs: [ "create" ]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
resourceNames: [ "quarkus-config" ]
verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: quarkus-config
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: quarkus-config
subjects:
- kind: ServiceAccount
name: address-space-controller
namespace: amq-online-infra
Applying configuration
You can apply the messaging configuration directly from the example source:
cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address
Verifying application
To verify that the application is running, first ensure that addresses are created and active:
until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done
Then, retrieve the application route URL (open the echoed URL in your browser):
echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"
You should now see a ticket that updates periodically based on the messages sent to and received from AMQ Online.
Summary
In this article, we showed how to write a Quarkus application that uses AMQP for messaging, configured this application to run on Red Hat OpenShift, and injected the application configuration derived from the AMQ Online configuration. We then created the manifests needed to provision messaging for the application.
Last updated: July 25, 2023