Featured image for OpenShift integration topics.

Apache ActiveMQ is a highly popular message broker that features persistence, guaranteed message delivery, and high throughput. Apache ActiveMQ Artemis streamlines the classic message broker implementation for microservices architectures. This article is for developers transitioning from ActiveMQ Classic to ActiveMQ Artemis. We will show you how to get the two versions working together using Apache Camel K. Our example is based on Red Hat AMQ versions 6 and 7, and we will perform the steps on Red Hat OpenShift 4. Our code is written in Java. The integration process and techniques should be applicable to many other scenarios.

Note: See Implementing Apache ActiveMQ-style broker meshes with Apache Artemis for a discussion of the differences between Red Hat AMQ 6 and 7.

The Camel K integration workflow

Camel K is an integration framework that allows developers to exchange data easily between many common data processing tools. The demonstration in this article creates two ActiveMQ brokers, one using Red Hat AMQ 6 and the other using Red Hat AMQ 7. We then create two Camel K integrations: One to help AMQ 7 consume messages, and another to help AMQ 6 produce them. Finally, we create a third integration that functions as a message bridge between the two brokers.

Figure 1 illustrates the Camel K integration workflow.

Figure 1. A message bridge connects two ActiveMQ brokers.
Figure 1. A message bridge connects two ActiveMQ brokers.

You can retrieve the files for the demonstration from the GitHub repository associated with this article.

Prerequisites

This article assumes you have a basic knowledge of the OpenShift oc command-line interface and OpenShift web console. You need a running instance of OpenShift 4 and admin access on the OpenShift cluster. You will also need to install Camel K on your local system.

Set up the cluster

Create a namespace on your OpenShift instance:

$ oc new-project camelk-ssl

As the cluster administrator, install the Red Hat Camel K Operator and Red Hat AMQ 7.7 Operator on your OpenShift instance from the OpenShift OperatorHub.

Now, create certificates to enable secure SSL communication. Make sure you set the common name CN on the broker to your server domain name:


export CLIENT_KEYSTORE_PASSWORD=password
export CLIENT_TRUSTSTORE_PASSWORD=password
export BROKER_KEYSTORE_PASSWORD=password
export BROKER_TRUSTSTORE_PASSWORD=password

#Client Keystore
keytool -genkey -alias client -keyalg RSA -keystore client.ks -storepass $CLIENT_KEYSTORE_PASSWORD -keypass $CLIENT_KEYSTORE_PASSWORD -dname "CN=camelssl-example, O=RedHat, C=UK"

#Broker Keystore
keytool -genkey -alias broker -keyalg RSA -keystore broker.ks -storepass $BROKER_KEYSTORE_PASSWORD -keypass $BROKER_KEYSTORE_PASSWORD -dname "CN=*.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com, O=RedHat, C=UK"

#Export Client PublicKey
keytool -export -alias client -keystore client.ks -storepass $CLIENT_KEYSTORE_PASSWORD -file client.cert

#Export Server PublicKey
keytool -export -alias broker -keystore broker.ks -storepass $BROKER_KEYSTORE_PASSWORD -file broker.cert

#Import Server PublicKey into Client Truststore
keytool -import -alias broker -keystore client.ts -file broker.cert -storepass $CLIENT_TRUSTSTORE_PASSWORD -trustcacerts -noprompt

#Import Client PublicKey into Server Truststore
keytool -import -alias client -keystore broker.ts -file client.cert -storepass $BROKER_TRUSTSTORE_PASSWORD -trustcacerts -noprompt

#Import Server PublicKey into Server Truststore (i.e.: trusts its self)
keytool -import -alias broker -keystore broker.ts -file broker.cert -storepass $BROKER_TRUSTSTORE_PASSWORD -trustcacerts -noprompt

Set up the ActiveMQ consumer (AMQ 7)

Create a secret for the broker keystore and truststore for AMQ 7:

$ oc create secret generic example-amq-secret --from-file=broker.ks --from-literal=keyStorePassword=password --from-file=client.ts=broker.ts --from-literal=trustStorePassword=password

Create an AMQ 7 broker using the AMQ broker operator:


apiVersion: broker.amq.io/v2alpha4
kind: ActiveMQArtemis
metadata:
  name: example-amq
  application: example-amq
  namespace: camelk-ssl
spec:
  deploymentPlan:
   size: 1
   image: registry.redhat.io/amq7/amq-broker:7.6
   requireLogin: false
  adminUser: admin
  adminPassword: admin
  console:
    expose: true
  acceptors:
  - name: amqp
    protocols: amqp
    port: 5672
    sslEnabled: true
    sslSecret: example-amq-secret
    verifyHost: false
    expose: true

Create a new AMQ broker address. This configuration also creates a route and service for the broker:

apiVersion: broker.amq.io/v2alpha2
kind: ActiveMQArtemisAddress
metadata:
  name: example-testaddress
spec:
  addressName: test
  queueName: test
  routingType: anycast 

To test your AMQ 7 broker, you have to download ActiveMQ Artemis. Once the download is completed, extract it and open the /bin folder, where you will find the Apache Artemis script that is used for testing. You can produce and consume messages using the new AMQ 7 broker from your local machine; just make sure to change the URL and truststore location.

Here is a sample command to produce messages:

$ ./artemis producer --url 'amqps://example-amq7-amqp-0-svc-rte-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443?jms.username=admin&jms.password=admin&transport.trustStoreLocation=/home/marslan/work/camelkssl/client.ts&transport.trustStorePassword=password&transport.verifyHost=false' --threads 1 --protocol amqp --message-count 10 --destination 'queue://test'

Here is a sample command to consume messages:

$./artemis consumer --url 'amqps://example-amq7-amqp-0-svc-rte-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443?jms.username=admin&jms.password=admin&transport.trustStoreLocation=/home/marslan/work/camelkssl/client.ts&transport.trustStorePassword=password&transport.verifyHost=false' --threads 1 --protocol amqp --message-count 10 --destination 'queue://test'

Set up the ActiveMQ producer (AMQ 6)

Next, create a secret for the broker keystore and truststore for AMQ 6:

$ oc create secret generic example-amq6-secret --from-file=broker.ks --from-literal=keyStorePassword=password --from-file=broker.ts --from-literal=trustStorePassword=password 

Create an AMQ 6 broker from the command line:

$ oc new-app amq63-ssl -p APPLICATION_NAME=amq6-broker -p MQ_QUEUES=test -p MQ_TOPICS=test -p MQ_USERNAME=admin \ -p MQ_PASSWORD=admin -p ActiveMQ_SECRET=example-amq6-secret -p AMQ_TRUSTSTORE=broker.ts -p AMQ_TRUSTSTORE_PASSWORD=password \ -p AMQ_KEYSTORE=broker.ks -p AMQ_KEYSTORE_PASSWORD=password -n camelk-ssl

Create a route for the broker in the OpenShift console, using the amq6-broker-tcp-ssl service.

To test your AMQ 6 broker, you can download ActiveMQ. Once the download is completed, extract it and open the /bin folder, where you will find the ActiveMQ script that is used for testing. You can produce and consume messages using the new AMQ 6 broker from your local machine. Just make sure to change the URL and truststore location.

Enter the following command to produce messages:

$ ./activemq producer \ -Djavax.net.ssl.trustStore=/home/marslan/work/camelkssl/client.ts \ -Djavax.net.ssl.trustStorePassword=password \ -Djavax.net.ssl.keyStore=/home/marslan/work/camelkssl/client.ks \ -Djavax.net.ssl.keyStorePassword=password \ --brokerUrl ssl://test-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443 \ --user admin \ --password admin \ --destination queue://test \ --messageCount 1000 \ --message HelloWorld 

Enter the following command to consume messages:

$ ./activemq consumer \ -Djavax.net.ssl.trustStore=/home/marslan/work/camelkssl/client.ts \ -Djavax.net.ssl.trustStorePassword=password \ -Djavax.net.ssl.keyStore=/home/marslan/work/camelkssl/client.ks \ -Djavax.net.ssl.keyStorePassword=password \ --brokerUrl ssl://test-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443 \ --user admin \ --password admin \ --destination queue://test 

Test Camel K

Now, we will import a simple Java application and use it to ensure that Camel K is working correctly. The HelloCamelK.java program is available in the Camel K SSL Demo repository on GitHub.

Run the file with the following command (you may delete this integration after the test):

$ kamel run HelloCamelK

Create the AMQ 7 consumer

Create folders for AMQ 6 and AMQ 7:

$ mkdir amq6
$ mkdir amq7

Create a new configuration directory for AMQ 7:

$ mkdir amq7/configs

In the configuration directory, create an application.properties file for the AMQ 7 integration, with the following contents:

quarkus.qpid-jms.url=amqps://example-amq7-amqp-0-svc-rte-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443?transport.keyStoreLocation=/etc/ssl/example-amq-secret/broker.ks&transport.keyStorePassword=password&transport.trustStoreLocation=/etc/ssl/example-amq-secret/client.ts&transport.trustStorePassword=password&transport.verifyHost=false
quarkus.qpid-jms.username=admin
quarkus.qpid-jms.password=admin
jms.destinationType=queue
jms.destinationName=test

Within the amq7 folder, create a program named amq7consumer.java for the AMQ 7 consumer integration:

// camel-k: language=java <1>
// camel-k: dependency=mvn:org.amqphub.quarkus:quarkus-qpid-jms <2>

import org.apache.camel.builder.RouteBuilder;

public class amq7consumer extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("jms:{{jms.destinationType}}:{{jms.destinationName}}").to("log:info"); <3>
  }
}

Let's look at a few parts of the code:

  • camel-k: language=java provides information to Camel K to run the Java file. The comment enables Camel K to sort out dependencies.
  • camel-k: dependency=mvn:org.amqphub.quarkus:quarkus-qpid-jms supports out-of-the-box configuration for SSL, without requiring any further customization. You can find more information in the Apache SSL documentation.
  • The code consumes the jms.url, jms.destinationType, and jms.destinationName properties specified in the application.properties file and prints the values to the log.

The Java file is imported in the next step, where we run Camel K:

$ kamel run -n camelk-ssl --property file:./amq7/configs/application.properties --resource secret:example-amq-secret@/etc/ssl/example-amq-secret amq7/amq7consumer.java 

In the kamel run command:

  • The --property file option imports properties to Camel.
  • The --resource option adds resources to the cluster pod. In this command, we are adding example-amq-secret and placing its contents into the /etc/ssl/example-amq-secret folder within the pod.

To debug Camel K, add the following option to the kamel run command:

--trait jvm.options=-Djavax.net.debug=all

Create the AMQ 6 producer

Create a new configuration directory for AMQ 6:

$ mkdir amq6/configs

In the configuration directory, create an application.properties file for the AMQ 6 integration, with the following contents:

activemq.destination.brokerURL=ssl://test-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443
activemq.destination.username=admin
activemq.destination.password=admin
activemq.destination.ssl.keyStorePassword=password
activemq.destination.ssl.keyStoreLocation=/etc/ssl/example-amq6-secret/broker.ks
activemq.destination.ssl.trustStorePassword=password
activemq.destination.ssl.trustStoreLocation=/etc/ssl/example-amq6-secret/broker.ts
activemq.destination.type=queue
activemq.destination.name=test

Within the amq6 folder, create a program named amq6SSLproducer.java for the AMQ 6 producer integration:

// camel-k: dependency=camel:camel-quarkus-activemq
// camel-k: dependency=camel:camel-quarkus-timer
// camel-k: property=period=5000

public class amq6SSLProducer extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("timer:foo?fixedRate=true&period={{period}}").bean(this, "generateFakePerson()").to("log:info")
        .to("activemq:{{activemq.destination.type}}:{{activemq.destination.name}}?connectionFactory=#pooledConnectionFactory");
  }

  public String generateFakePerson() {
    Faker faker = new Faker();
    return faker.name().fullName() + " lives on " + faker.address().streetAddress();
  }

  @ApplicationScoped
  public ActiveMQComponent activeMq(PooledConnectionFactory pooledConnectionFactory) {
    ActiveMQComponent activeMqComponent = new ActiveMQComponent();
    activeMqComponent.setConnectionFactory(pooledConnectionFactory);
    activeMqComponent.setCacheLevelName("CACHE_CONSUMER");

    return activeMqComponent;
  }

  @BindToRegistry
  public PooledConnectionFactory pooledConnectionFactory() throws Exception {
    return new PooledConnectionFactory(sslConnectionFactory());
  }

  private ActiveMQSslConnectionFactory sslConnectionFactory() throws Exception {
    ActiveMQSslConnectionFactory connectionFactory = new ActiveMQSslConnectionFactory();
    logger.info("BrokerURL: " + destinationBrokerURL);
    connectionFactory.setBrokerURL(destinationBrokerURL);
    connectionFactory.setUserName(destinationUserName);
    connectionFactory.setPassword(destinationPassword);
    connectionFactory.setTrustStore(destinationTrustStoreLocation);
    connectionFactory.setTrustStorePassword(destinationTruststorePassword);
    connectionFactory.setKeyStore(destinationKeyStoreLocation);
    connectionFactory.setKeyStorePassword(destinationKeystorePassword);

    return connectionFactory;
  }
}

Run the producer using Camel K:

$ kamel run -n camelk-ssl \ --property file:./amq6/configs/application.properties \ --resource secret:example-amq6-secret@/etc/ssl/example-amq6-secret amq6/amqssl.java

Create a message bridge between AMQ 6 and AMQ 7

Create new folders for the message bridge and its configuration:

$ mkdir message-bridge
$ mkdir message-bridge/configs

In the configuration directory, create an application.properties file for the message bridge, with the following contents:

#amq6
activemq.source.brokerURL=ssl://test-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443
activemq.source.username=admin
activemq.source.password=admin
activemq.source.ssl.keyStorePassword=password
activemq.source.ssl.keyStoreLocation=/etc/ssl/example-amq6-secret/broker.ks
activemq.source.ssl.trustStorePassword=password
activemq.source.ssl.trustStoreLocation=/etc/ssl/example-amq6-secret/broker.ts
activemq.source.type=queue
activemq.source.name=test

#amq7
quarkus.qpid-jms.url=amqps://example-amq7-amqp-0-svc-rte-camelk-ssl.apps.cluster-hhl6r.hhl6r.sandbox55.opentlc.com:443?transport.keyStoreLocation=/etc/ssl/example-amq-secret/broker.ks&transport.keyStorePassword=password&transport.trustStoreLocation=/etc/ssl/example-amq-secret/client.ts&transport.trustStorePassword=password&transport.verifyHost=false
quarkus.qpid-jms.username=admin
quarkus.qpid-jms.password=admin
jms.destinationType=queue
jms.destinationName=test

In the message-bridge folder, create a program named sixToSevenBridge.java, with the following contents:

  @Override
  public void configure() throws Exception {
    from("activeMQSource:{{activemq.source.type}}:{{activemq.source.name}}").to("log:info")
    .to("jms:{{jms.destinationType}}:{{jms.destinationName}}?connectionFactory=artemisConnectionFactory");
  }

  @BindToRegistry("artemisConnectionFactory")
  public JmsConnectionFactory connectionFactory() throws Exception {
    return new JmsConnectionFactory(destinationBrokerURL);
  }

  @BindToRegistry("activeMQSource")
  public ActiveMQComponent activeMQSource() throws Exception{
    ActiveMQComponent activeMqComponent = new ActiveMQComponent();
    activeMqComponent.setConnectionFactory(pooledConnectionFactorySource());
    activeMqComponent.setCacheLevelName("CACHE_CONSUMER");
    return activeMqComponent;
  }

  @BindToRegistry("pooledConnectionFactorySource")
  public PooledConnectionFactory pooledConnectionFactorySource() throws Exception {
    return new PooledConnectionFactory(sslConnectionFactorySource());
  }

  private ActiveMQSslConnectionFactory sslConnectionFactorySource() throws Exception {
    ActiveMQSslConnectionFactory connectionFactory = new ActiveMQSslConnectionFactory();
    System.out.println("BrokerURL: " + sourceBrokerURL);
    connectionFactory.setBrokerURL(sourceBrokerURL);
    connectionFactory.setUserName(sourceUserName);
    connectionFactory.setPassword(sourcePassword);
    connectionFactory.setTrustStore(sourceTrustStoreLocation);
    connectionFactory.setTrustStorePassword(sourceTruststorePassword);
    connectionFactory.setKeyStore(sourceKeyStoreLocation);
    connectionFactory.setKeyStorePassword(sourceKeystorePassword);

    return connectionFactory;
  }

Run the message bridge:

$ kamel run -n camelk-ssl \ --property file:./message-bridge/configs/application.properties message-bridge/sixToSevenBridge.java \ --resource secret:example-amq6-secret@/etc/ssl/example-amq6-secret \ --resource secret:example-amq-secret@/etc/ssl/example-amq-secret 

View the logs to see whether the message bridge worked:

$ oc logs -f {AMQ7 consumer pod} 

Clean up

Uninstall the operators using the OpenShift command line:

$ oc delete project camelk-ssl

Conclusion

This article showed you how to use Camel K to connect different versions of the ActiveMQ message broker using Red Hat AMQ on OpenShift 4. Camel K is a highly effective integration framework that runs natively on Kubernetes. This article showcased just a few of its capabilities. Camel K allows you to build and containerize applications easily and quickly. Its integrations use very lightweight pods, thus consuming fewer resources.

Get more information from the Camel K documentation. You might also enjoy the article Six reasons to love Camel K and our series of related Camel K learning courses.

Last updated: March 18, 2024