Cloud-native messaging on Red Hat OpenShift with Quarkus and AMQ Online

Quarkus is a Kubernetes-native Java stack tailored for GraalVM and OpenJDK HotSpot, crafted from the best of breed Java libraries and standards, according to the project website. Starting with the 0.17.0 release, Quarkus supports using the Advanced Message Queuing Protocol (AMQP), which is an open standard for passing business messages between applications or organizations.
Red Hat AMQ Online is a Red Hat OpenShift-based mechanism for delivering messaging as a managed service. Previously, we have seen how to use AMQ Online to provision messaging. In this article, we will combine AMQ Online and Quarkus to show how you can create a modern messaging setup on OpenShift using two new technologies from the messaging space.
The guide assumes you have an installation of AMQ Online on OpenShift. Read the installation guide for more information. AMQ Online is based on the EnMasse open source project.
We will start off with creating a Quarkus application using reactive messaging, a simple order processing system. It includes an order generator, that sends orders to a messaging queue at fixed intervals, an order processor that processes orders from a messaging queue and delivers confirmations to be viewed in an HTML page.
Once the application is created, we will show how to inject messaging configuration into the application and use AMQ Online to provision the messaging resources that we need.
Quarkus application
Our Quarkus application will run on OpenShift and is a modified version of the amqp-quickstart. The full example client can be found here.
Everything you need to grow your career.
With your free Red Hat Developer program membership, unlock our library of cheat sheets and ebooks on next-generation application development.
SIGN UPOrder generator
The order generator sends monotonically increasing order identifiers to an “orders” address every 5 seconds.
@ApplicationScoped public class OrderGenerator { private int orderId = 1; @Outgoing("orders") public Flowable<Integer> generate() { return Flowable.interval(5, TimeUnit.SECONDS) .map(tick -> orderId++); } }
Order processor
The order processor is even simpler; it just returns a confirmation id to the “confirmations” address.
@ApplicationScoped public class OrderProcessor { @Incoming("orders") @Outgoing("confirmations") public Integer process(Integer order) { // Confirmation id is twice the order id 🙂 return order * 2; } }
Confirmations resources
The confirmations resource is an HTTP endpoint for listing the confirmations that have been produced.
@Path("/confirmations") public class ConfirmationResource { @Inject @Stream("confirmations") Publisher<Integer> orders; @GET @Produces(MediaType.TEXT_PLAIN) public String hello() { return "hello"; } @GET @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) public Publisher<Integer> stream() { return orders; } }
Configuration
Our application needs some configuration information in order to connect to AMQ Online. The Quarkus connector configuration, AMQP endpoint information, and client credentials all need to be supplied. Although it is good practice to keep configuration in one place, we will split things up a bit to show the options you have for configuring a Quarkus application.
Connectors
The connector configuration can be supplied at compile time using the application properties file:
mp.messaging.outgoing.orders.connector=smallrye-amqp mp.messaging.incoming.orders.connector=smallrye-amqp
To keep things simple, we will only use a messaging queue for the “orders” address. The “confirmations” address will use an in-memory queue in the example application.
AMQP endpoint
The AMQP endpoint hostname and port information is not known at compile time and must be injected. The endpoint information can be provided in a configmap created by AMQ Online; thus, we will set them as environment variables in the application manifest:
spec: template: spec: containers: - env: - name: AMQP_HOST valueFrom: configMapKeyRef: name: quarkus-config key: service.host - name: AMQP_PORT valueFrom: configMapKeyRef: name: quarkus-config key: service.port.amqp
Credentials
We want to use the ability to use service account tokens for authenticating our messaging application on OpenShift. To do this, we need to create a custom ConfigSource that reads the authentication token from the pod filesystem:
public class MessagingCredentialsConfigSource implements ConfigSource { private static final Set<String> propertyNames; static { propertyNames = new HashSet<>(); propertyNames.add("amqp-username"); propertyNames.add("amqp-password"); } @Override public Set<String> getPropertyNames() { return propertyNames; } @Override public Map<String, String> getProperties() { try { Map<String, String> properties = new HashMap<>(); properties.put("amqp-username", "@@serviceaccount@@"); properties.put("amqp-password", readTokenFromFile()); return properties; } catch (IOException e) { throw new UncheckedIOException(e); } } @Override public String getValue(String key) { if ("amqp-username".equals(key)) { return "@@serviceaccount@@"; } if ("amqp-password".equals(key)) { try { return readTokenFromFile(); } catch (IOException e) { throw new UncheckedIOException(e); } } return null; } @Override public String getName() { return "messaging-credentials-config"; } private static String readTokenFromFile() throws IOException { return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8); } }
Building and deploying the application
To deploy the example application, you need GraalVM to perform native compilation of the application. Follow the steps in the Quarkus Guide for setting up your environment.
Then, follow these instructions to download the source, build, and deploy the example application:
git clone https://github.com/EnMasseProject/enmasse-example-clients cd enmasse-example-clients/quarkus-example-client oc new-project myapp mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply
The application will be deployed but will not start until we configure AMQ Online with the messaging resources we need.
Configuring messaging
The remaining part is to configure messaging resources needed by our application. We need to create an address space to provision a messaging endpoint, an address to configure our messaging address, and a messaging user to configure client credentials.
Address space
An AMQ Online AddressSpace is a group of addresses that share connection endpoints as well as authentication and authorization policies. When creating an AddressSpace, you can configure how your messaging endpoints are exposed:
apiVersion: enmasse.io/v1beta1 kind: AddressSpace metadata: name: quarkus-example spec: type: brokered plan: brokered-single-broker endpoints: - name: messaging service: messaging exports: - name: quarkus-config kind: configmap
Address
Messages are sent and received from an address. An address has a type that determines its semantics and a plan that determines the amount of resources reserved for this address. An address can be defined like this:
apiVersion: enmasse.io/v1beta1 kind: Address metadata: name: quarkus-example.orders spec: address: orders type: queue plan: brokered-queue
Messaging user
To ensure that only trusted applications are able to send and receive messages to your addresses, a messaging user must be created. For applications running on-cluster, you can authenticate clients using an OpenShift service account. A “serviceaccount” user can be defined like this:
apiVersion: user.enmasse.io/v1beta1 kind: MessagingUser metadata: name: quarkus-example.app spec: username: system:serviceaccount:myapp:default authentication: type: serviceaccount authorization: - operations: ["send", "recv"] addresses: ["orders"]
Application configuration permissions
To allow AMQ Online to create the configmap used to inject AMQP endpoint information, we also need to define a Role and RoleBinding:
--- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: quarkus-config spec: rules: - apiGroups: [ "" ] resources: [ "configmaps" ] verbs: [ "create" ] - apiGroups: [ "" ] resources: [ "configmaps" ] resourceNames: [ "quarkus-config" ] verbs: [ "get", "update", "patch" ] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: quarkus-config roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: quarkus-config subjects: - kind: ServiceAccount name: address-space-controller namespace: amq-online-infra
Applying configuration
You can apply the messaging configuration directly from the example source:
cd enmasse-example-clients/quarkus-example-client oc project myapp oc apply -f src/main/resources/k8s/addressspace oc apply -f src/main/resources/k8s/address
Verifying application
To verify that the application is running, first ensure that addresses are created and active:
until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done
Then, retrieve the application route URL (open the echoed URL in your browser):
echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"
You should now see a ticket that updates periodically based on the messages sent to and received from AMQ Online.
Summary
In this article, we showed how to write a Quarkus application that uses AMQP for messaging, configured this application to run on Red Hat OpenShift, and injected the application configuration derived from the AMQ Online configuration. We then created the manifests needed to provision messaging for the application.