On my latest engagement we were asked to setup and configure JBoss Fuse Service Works, which can either be configured with HornetQ (out of box) or ActiveMQ message servers. At the moment ActiveMQ requires the Karaf container and we couldn't convince ourselves it was right approach for this client.
Anyhow, for Part 1, I wanted to focus on HornetQ and how easy it is to test your message server with STOMP (in part 2, I plan similar fun with ActiveMQ). I stumbled upon STOMP while looking for a HornetQ client for python and was intrigued with it's simplicity. Did I mention this protocol is supported my activeMQ as well...cool. I mean, with telnet and a few commands you're in business, subscribing/producing/consuming messages, which is perfect for testing. Using telnet is a nice approach, but trying to script something repeatable can get messy, so I decided it best to stick with the python library. There are slew of STOMP clients available and in various languages.
Configure JBoss EAP HornetQ Server for STOMP via CLI - I'm not going to go through setting up hornetq from scratch, but rather show the configurations needed from STOMP to be leveraged.
Add socket-binding group
/socket-binding-group=standard-sockets/socket-binding=messaging-stomp:add(port=61613)
Add STOMP Acceptor
/subsystem=messaging/hornetq-server=default/remote-acceptor=stomp-acceptor:add(socket-binding=messaging-stomp)
Add parameters: protocol, ttl and port
/subsystem=messaging/hornetq-server=default/remote-acceptor=stomp-acceptor/param=protocol:add(value=stomp) /subsystem=messaging/hornetq-server=default/remote-acceptor=stomp-acceptor/param=connection-ttl:add(value=30000) /subsystem=messaging/hornetq-server=default/remote-acceptor=stomp-acceptor/param=port:add(value=61613)
- Verify STOMP protocol
# cat standalone/log/server.log | grep STOMP 12:20:09,315 INFO [org.hornetq.core.server] (MSC service thread 1-3) HQ221020: Started Netty Acceptor version 3.6.6.Final-redhat-1-fd3c6b7 0.0.0.0:61613 for STOMP protocol
Configure STOMP client
Download STOMP server client
git clone git@github.com:jasonrbriggs/stomp.py.git
add to python libraries
cd /../stomp.py sudo python2.7 setup.py install
Producer script
import stomp
import time
# add server event listeners
class HornetqListener(object):
def on_connecting(self, host_and_port):
print('on_connecting %s %s' % host_and_port)
def on_error(self, headers, message):
print('received an error %s' % (headers, message))
def on_message(self, headers, body):
print('on_message %s %s' % (headers, body))
def on_heartbeat(self):
print('on_heartbeat')
def on_send(self, frame):
print('on_send %s %s %s' % (frame.cmd, frame.headers, frame.body))
def on_connected(self, headers, body):
print('on_connected %s %s' % (headers, body))
def on_disconnected(self):
print('on_disconnected')
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_before_message(self, headers, body):
print('on_before_message %s %s' % (headers, body))
return (headers, body)
# params
dest='jms.topic.spin'
server=<hornetq-server-ip>
port=61613
# optional, based on hornetq config
user=<message-queue-username>
passwd=<message-queue-password>
# make STOMP server connection
conn = stomp.Connection(host_and_ports=[(server, port)], prefer_localhost=False,keepalive=True,vhost='irs.example.com')
# instantiate listener
conn.set_listener('',HornetqListener())
conn.start()
conn.connect(username=user,passcode=passwd,wait=True)
# subscribe to queue
conn.subscribe(destination=dest,id=1)
# send messages with timestamp
for x in range(5):
# send message to queue
conn.send(body='the current time is: ' + str(time.strftime('%X')) , destination=dest,persistent='true')
time.sleep(4)
time.sleep(4)
conn.disconnect()
Subscriber script
import stomp
import time
# add listeners for server events
class HornetqListener(object):
def on_connecting(self, host_and_port):
print('on_connecting %s %s' % host_and_port)
def on_error(self, headers, message):
print('received an error %s' % (headers, message))
def on_message(self, headers, body):
print('on_message %s %s' % (headers, body))
def on_heartbeat(self):
print('on_heartbeat')
def on_send(self, frame):
print('on_send %s %s %s' % (frame.cmd, frame.headers, frame.body))
def on_connected(self, headers, body):
print('on_connected %s %s' % (headers, body))
def on_disconnected(self):
print('on_disconnected')
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_before_message(self, headers, body):
print('on_before_message %s %s' % (headers, body))
return (headers, body)
# params
dest='jms.topic.spin'
server=<hornetq-server-ip>
port=61613
# optional, based on hornetq config
user=<message-queue-username>
passwd=<message-queue-password>
# make STOMP server connection
conn = stomp.Connection(host_and_ports=[(server, port)], prefer_localhost=False,keepalive=True,vhost='irs.example.com', heartbeats=(20,20))
# instantiate listener
conn.set_listener('',HornetqListener())
conn.start()
conn.connect(username=user,passcode=passwd,wait=True)
# subscribe to queue
conn.subscribe(destination=dest,id=1)
time.sleep(1200)
conn.disconnect()
execute scripts
- test sending messages to queue
python msg-send.py
- test retrieving messages from queue
python msg-subscribe.py
gotchas
There is one gotcha I experienced. If you want to test failover with an active/passive server topology, the messages won't persist. The protocol is not officially supported and this feature hasn't been added to the STOMP protocol.
Happy STOMPing!