In this article, I will discuss how to capture incoming and outgoing messages for Red Hat AMQ 7 (RHAMQ 7). This might advantageous if you need to log the incoming or outgoing traffic, or the messages from a broker, or during development and/or testing when you want to see all message. Additionally, There may also be a need to modify messages in transit. Using RHAMQ 7 interceptors, you can intercept traffic to and from the RHAMQ 7 broker. You can also modify messages using the interceptor.
On my personal GitHub page, there is one example of using the interceptor, which works with the core protocol of RHAMQ 7.
The first step in creating an interceptor is to implement the Interceptor
interface.
package org.apache.artemis.activemq.api.core.interceptor; public interface Interceptor { boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException; }
At the GitHub page mentioned above, there is a simple Java class, SimpleInterceptor.java
, which implements the Interceptor
interface and intercepts messages using the following code:
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException { if (packet instanceof SessionSendMessage) { System.out.println("SimpleInterceptor gets called!.... Packet: " + packet.getClass().getName() + "RemotingConnection: " + connection.getRemoteAddress() ); SessionSendMessage realPacket = (SessionSendMessage) packet; Message msg = realPacket.getMessage(); if((msg.getTimestamp()>0) && msg.getUserID()!=null) System.out.println("Msg: "+msg.toString()); } else if (packet instanceof SessionReceiveMessage) { System.out.println("SimpleInterceptor gets called!.... Packet: " + packet.getClass().getName() + "RemotingConnection: " + connection.getRemoteAddress() ); SessionReceiveMessage realPacket = (SessionReceiveMessage) packet; Message msg = realPacket.getMessage(); if((msg.getTimestamp()>0) && msg.getUserID()!=null) System.out.println("Msg: "+msg.toString()); } return true; }
It should be noted that when a packet is an instance of SessionSendMessage
, it is an incoming message. When a packet is an instance of SessionReceiveMessage
, it is an outgoing message, allowing the consumer/subscriber to further process it.
By returning true
, we then call the next interceptor (if any) or target. If we return false
, further processing is aborted, so neither the next interceptor would be called nor would any target be sent the message.
Build the project using mvn package
and copy the JAR to the location amq_broker_home/lib
.
In the broker configuration file broker.xml
, we have to specify this interceptor for incoming as well as outgoing messages in the following manner:
<core...> <remoting-incoming-interceptors> <class-name>com.mycompany.interceptor.SimpleInterceptor</class-name> </remoting-incoming-interceptors> <remoting-outgoing-interceptors> <class-name>com.mycompany.interceptor.SimpleInterceptor</class-name> </remoting-outgoing-interceptors> </core>
At this point, we can start the RHAMQ 7 broker to begin testing. Run the following commands from the directory amq_broker_home/bin
:
./artemis consumer --url tcp://localhost:61616 --user admin --password admin --destination queue://TESTCP --verbose ./artemis producer --url tcp://localhost:61616 --user admin --password admin --destination queue://TESTCP --message-count 1
You should see the following results:
#For Sender SimpleInterceptor gets called!.... Packet: org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessageRemotingConnection: /127.0.0.1:49896 Msg: CoreMessage[messageID=0,durable=true,userID=4e094a2a-d6e0-11e8-a444-e8b1fc466329,priority=4, timestamp=Tue Oct 23 21:55:50 IST 2018,expiration=0, durable=true, address=exampleQueue,size=270,properties=TypedProperties[__AMQ_CID=4df09207-d6e0-11e8-a444-e8b1fc466329,_AMQ_ROUTING_TYPE=1]]@183519025 #For Receiver SimpleInterceptor gets called!.... Packet: org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessageRemotingConnection: /127.0.0.1:49896 Msg: CoreMessage[messageID=3333,durable=true,userID=4e094a2a-d6e0-11e8-a444-e8b1fc466329,priority=4, timestamp=Tue Oct 23 21:55:50 IST 2018,expiration=0, durable=true, address=exampleQueue,size=270,properties=TypedProperties[__AMQ_CID=4df09207-d6e0-11e8-a444-e8b1fc466329,_AMQ_ROUTING_TYPE=1]]@1835190256
We can also get destination (queue/topic) statistics by using the following command from the location amq_broker_home/bin
:
/artemis queue stat --url tcp://localhost:61621 --user admin --password admin --queueName TESTCP --verbose
As mentioned earlier, this interceptor will only log messages in the core protocol. For the Stomp protocol, our interceptor should implement the interface StompFrameInterceptor
:
package org.apache.activemq.artemis.core.protocol.stomp; public interface StompFrameInterceptor extends BaseInterceptor { boolean intercept(StompFrame stompFrame, RemotingConnection connection); }
Similarly, for the MQTT protocol, an interceptor should implement the interface MQTTInterceptor
:
package org.apache.activemq.artemis.core.protocol.mqtt; public interface MQTTInterceptor extends BaseInterceptor { boolean intercept(MqttMessage mqttMessage, RemotingConnection connection); }
That's it! I hope this article will help you in setting up an RHAMQ 7 interceptor and logging or modifying the incoming or outgoing messages.
Additional resources
Here are related articles on the Red Hat Developer blog:
Last updated: September 3, 2019