In this article, we will use a Python-based messaging client to connect and subscribe to a topic with a durable subscription in the Apache ActiveMQ Artemis broker. We will use the text-based STOMP protocol to connect and subscribe to the broker. STOMP clients can communicate with any STOMP message broker to provide messaging interoperability among many languages, platforms, and brokers.
If you need to brush up on the difference between persistence and durability in messaging, check Mary Cochran's article on developers.redhat.com/blog.
A similar process can be used with Red Hat AMQ 7. The broker in Red Hat AMQ 7 is based on the Apache ActiveMQ Artemis project. See the overview on developers.redhat.com for more information.
Setting Up the Project
In the following example, we are using one client, both to publish and subscribe to a topic. You can find the code at my personal GitHub repo. We have two receiver_queue.py and receiver_topic.py Python messaging clients. While receiver_queue.py
is a Python client based on the STOMP protocol for point-to-point (queue) connection to the broker, receiver_topic.py
is a Python client based on the STOMP protocol for durable subscription against a topic to the broker.
Here is the code:
import time import sys import stomp class MyListener(stomp.ConnectionListener): def on_error(self, headers, message): print('received an error "%s"' % message) def on_message(self, headers, message): print('received a message "%s"' % message) hosts = [('localhost', 61616)] conn = stomp.Connection(host_and_ports=hosts) conn.set_listener('', MyListener()) conn.start() conn.connect('admin', 'admin', wait=True,headers = {'client-id': 'clientname'} ) conn.subscribe(destination='A.B.C.D', id=1, ack='auto',headers = {'subscription-type': 'MULTICAST','durable-subscription-name':'someValue'}) conn.send(body=' '.join(sys.argv[1:]), destination='A.B.C.D') time.sleep(2) conn.disconnect()
The following are tasks performed by this code:
- To receive messages from the messaging system, we need to set up a listener on a connection, and then later subscribe to the destination.
- We are establishing a connection to the broker available locally on port 61616. The first parameter to a
Connection
ishost_and_ports
. This contains an IP address and the port where the message broker is listening for STOMP connections. - The
start
method creates a socket connection to the broker. - Then we use the
connect
method with credentials to access the broker and we use theheaders
client-id
to ensure that the subscription that is created is durable. - Once a connection is established to the broker with
subscribe
method, we are subscribing to destinationA.B.C.D
using acknowledgment modeauto
. Also, we must provide theheaders
subscription-type asMULTICAST
anddurable-subscription-name
as some text value. - To create a durable subscription, the
client-id
header must be set on theCONNECT
frame and thedurable-subscription-name
must be set on theSUBSCRIBE
frame. The combination of these two headers will form the identity of the durable subscription. - After a connection is established to the broker, we can use the
send
method to send/produce messages to the destination A.B.C.D. Here the first argument is to accept the text/string value from the command line, and the second argument is destination name or topic name.
How to Execute the Python Client
- Make sure the Apache ActiveMQ Artemis broker is configured to support the STOMP protocol. By default, port 61616 is configured to support almost all messaging protocols.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
- To run the client using the STOMP protocol, we first need the
stomp
module so that components of the STOMP API, such asconnect
,start
,send
,subscribe
, anddisconnect
, are available. So install thestomp
module first.
pip install stomp.py
- Once the
stomp
module is installed, we can easily run the client in the following way:
[cpandey@vm254-231 python_stomp_example]$ python receiver_topic.py "Hello World" received a message "Hello World" [cpandey@vm254-231 python_stomp_example]$
- We can check the results using the following commands from the Apache ActiveMQ Artemis broker:
[cpandey@vm254-231 bin]$ ./artemis address show A.B.C.D DLQ [cpandey@vm254-231 bin]$ ./artemis queue stat --user admin --password admin --url tcp://localhost:61616 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N |NAME |ADDRESS |CONSUMER_COUNT |MESSAGE_COUNT |MESSAGES_ADDED |DELIVERING_COUNT |MESSAGES_ACKED | |DLQ |DLQ |0 |0 |0 |0 |0 | |ExpiryQueue |ExpiryQueue |0 |0 |0 |0 |0 | |clientname.someValue |A.B.C.D |0 |0 |1 |0 |1 | [cpandey@vm254-231 bin]$
Note: A.B.C.D is the Address
created and the durable subscription is created as queue clientname.someValue
.
- If we read the network dumps using Wireshark, the following is the complete stream:
STOMP accept-version:1.1 client-id:clientname login:admin passcode:admin .CONNECTED version:1.1 session:4c98c896 server:ActiveMQ-Artemis/2.4.0.amq-711002-redhat-1 ActiveMQ Artemis Messaging Engine . SUBSCRIBE ack:auto destination:A.B.C.D durable-subscription-name:someValue id:1 subscription-type:MULTICAST .SEND content-length:4 destination:A.B.C.D abcd.MESSAGE subscription:1 content-length:4 message-id:30 destination:A.B.C.D expires:0 redelivered:false priority:4 persistent:false timestamp:1528858440363 abcd. DISCONNECT receipt:6a8bc1fd-0c8b-4e13-871f-fbc9c8c4df9d .RECEIPT receipt-id:6a8bc1fd-0c8b-4e13-871f-fbc9c8c4df9d
That's it. I hope this helps you to have a basic understanding of using the STOMP protocol with the Apache ActiveMQ Artemis or Red Hat AMQ 7.
Last updated: September 3, 2019