Red Hat Enterprise Linux

In this article, we will use a Python-based messaging client to connect and subscribe to a topic with a durable subscription in the Apache ActiveMQ Artemis broker. We will use the text-based STOMP protocol to connect and subscribe to the broker. STOMP clients can communicate with any STOMP message broker to provide messaging interoperability among many languages, platforms, and brokers.

If you need to brush up on the difference between persistence and durability in messaging, check Mary Cochran's article on developers.redhat.com/blog.

A similar process can be used with Red Hat AMQ 7. The broker in Red Hat AMQ 7 is based on the Apache ActiveMQ Artemis project. See the overview on developers.redhat.com for more information.

Setting Up the Project

In the following example, we are using one client, both to publish and subscribe to a topic. You can find the code at my personal GitHub repo. We have two receiver_queue.py and receiver_topic.py Python messaging clients. While receiver_queue.py is a Python client based on the STOMP protocol for point-to-point (queue) connection to the broker, receiver_topic.py is a Python client based on the STOMP protocol for durable subscription against a topic to the broker.

Here is the code:

import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
 def on_error(self, headers, message):
 print('received an error "%s"' % message)
 def on_message(self, headers, message):
 print('received a message "%s"' % message)
hosts = [('localhost', 61616)]

conn = stomp.Connection(host_and_ports=hosts)
conn.set_listener('', MyListener())
conn.start()
conn.connect('admin', 'admin', wait=True,headers = {'client-id': 'clientname'} )
conn.subscribe(destination='A.B.C.D', id=1, ack='auto',headers = {'subscription-type': 'MULTICAST','durable-subscription-name':'someValue'})

conn.send(body=' '.join(sys.argv[1:]), destination='A.B.C.D')

time.sleep(2)
conn.disconnect()

The following are tasks performed by this code:

  • To receive messages from the messaging system, we need to set up a listener on a connection, and then later subscribe to the destination.
  • We are establishing a connection to the broker available locally on port 61616. The first parameter to a Connection is host_and_ports. This contains an IP address and the port where the message broker is listening for STOMP connections.
  • The start method creates a socket connection to the broker.
  • Then we use the connect method with credentials to access the broker and we use the headers client-id to ensure that the subscription that is created is durable.
  • Once a connection is established to the broker with subscribe method, we are subscribing to destination A.B.C.D using acknowledgment mode auto. Also, we must provide the headers subscription-type as MULTICAST and durable-subscription-name as some text value.
  • To create a durable subscription, the client-id header must be set on the CONNECT frame and the durable-subscription-name must be set on the SUBSCRIBE frame. The combination of these two headers will form the identity of the durable subscription.
  • After a connection is established to the broker, we can use the send method to send/produce messages to the destination A.B.C.D. Here the first argument is to accept the text/string value from the command line, and the second argument is destination name or topic name.

How to Execute the Python Client

  • Make sure the  Apache ActiveMQ Artemis broker is configured to support the STOMP protocol. By default, port 61616 is configured to support almost all messaging protocols.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
  • To run the client using the STOMP protocol, we first need the stomp module so that components of the STOMP API, such as connect, start,  send, subscribe, and disconnect, are available. So install the stomp module first.
pip install stomp.py
  • Once the stomp module is installed, we can easily run the client in the following way:
[cpandey@vm254-231 python_stomp_example]$ python receiver_topic.py "Hello World"
received a message "Hello World"
[cpandey@vm254-231 python_stomp_example]$
  • We can check the results using the following commands from the Apache ActiveMQ Artemis broker:
[cpandey@vm254-231 bin]$ ./artemis address show

A.B.C.D
DLQ

[cpandey@vm254-231 bin]$ ./artemis queue stat --user admin --password admin --url tcp://localhost:61616
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
|NAME |ADDRESS |CONSUMER_COUNT |MESSAGE_COUNT |MESSAGES_ADDED |DELIVERING_COUNT |MESSAGES_ACKED |
|DLQ |DLQ |0 |0 |0 |0 |0 |
|ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 |
|clientname.someValue |A.B.C.D |0 |0 |1 |0 |1 |
[cpandey@vm254-231 bin]$

Note: A.B.C.D is the Address created and the durable subscription is created as queue clientname.someValue.

  • If we read the network dumps using Wireshark, the following is the complete stream:
STOMP
accept-version:1.1
client-id:clientname
login:admin
passcode:admin

.CONNECTED
version:1.1
session:4c98c896
server:ActiveMQ-Artemis/2.4.0.amq-711002-redhat-1 ActiveMQ Artemis Messaging Engine

.
SUBSCRIBE
ack:auto
destination:A.B.C.D
durable-subscription-name:someValue
id:1
subscription-type:MULTICAST

.SEND
content-length:4
destination:A.B.C.D

abcd.MESSAGE
subscription:1
content-length:4
message-id:30
destination:A.B.C.D
expires:0
redelivered:false
priority:4
persistent:false
timestamp:1528858440363

abcd.
DISCONNECT
receipt:6a8bc1fd-0c8b-4e13-871f-fbc9c8c4df9d

.RECEIPT
receipt-id:6a8bc1fd-0c8b-4e13-871f-fbc9c8c4df9d

That's it. I hope this helps you to have a basic understanding of using the STOMP protocol with the Apache ActiveMQ Artemis or Red Hat AMQ 7.

Last updated: September 3, 2019