I'm trying to send and receive messages using Apache ActiveMQ Artemis 2.24, using Python to send messages using STOMP, and receiving messages in Java. The Stomp.py version is 8.0.1. Python version is 3.10.4. Java version is 1.8.0_342.
If I run my Java based message consumer and send a message using Java code, everything works fine. But if I send a message to the queue using Stomp.py I get the following exception on the receive side:
Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)
The message consumer looks like this:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQConsumerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, our queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// And a consumer attached to the queue ...
ClientConsumer consumer = session.createConsumer("example");
session.start();
while( true )
{
System.out.println( "Listening..." );
ClientMessage msgReceived = consumer.receive();
System.out.println("message = " msgReceived.getBodyBuffer().readString());
msgReceived.acknowledge();
session.commit();
}
}
}
The Java message producer that works looks like this:
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ActiveMQProducerMain
{
public static void main(String[] args) throws Exception
{
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
ClientProducer producer = session.createProducer("example");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello world!!!");
// We need a queue attached to the address ...
try
{
session.createQueue("example", RoutingType.ANYCAST, "example", true);
}
catch( ActiveMQQueueExistsException amqe )
{
if( amqe.getMessage().contains("already exists" ))
{
// no problem, out queue already exists
System.out.println( "Queue already exists on server!" );
}
else
{
amqe.printStackTrace();
}
}
// Once we have a queue, we can send the message ...
producer.send(message);
}
}
And the Python message producer code is as follows:
import time
import stomp
def main():
print( "Sending ActiveMQ message using STOMP client!\n" )
conn = stomp.Connection( [('172.16.1.141', 61613)] )
conn.connect( wait=True, headers={'consumerWindowSize': 0})
conn.send(body='Hello Python World', destination='example')
time.sleep(5)
conn.disconnect()
exit()
if __name__ == "__main__":
main()
EDIT 1:
Tried something else - I set up a Stomp.py message consumer and when I run that, and send the message with Stomp.py, everything works fine. So it seems that both the Java and Python client libraries fundamentally "work" but something about the intersection of the two (eg, sending from Python, receiving in Java) is breaking.
EDIT 2:
Also, if I run the Python based message consumer, and send using the Java message producer, the Python code receives a message, but the content of the message appears to be empty. So, again, it looks like there is some weird mismatch between what's happening in "Java land" and what's happening in "Python land."
Any thoughts on what could be causing this?
EDIT 3:
I tried switching to readNullableSimpleString() per the answer from Justin below, so now my code looks like this:
System.out.println("message = " msgReceived.getBodyBuffer().readNullableSimpleString() );
and now I get this:
java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)
Looks like a call to readNullableSimpleString still ultimately triggers a call to readSimpleString() which errors out.
EDIT 4:
I still don't understand the "why" of exactly what's happening under the hood, but I did find a way to get my message successfully in the consumer. This mechanism works:
int bodySize = msgReceived.getBodySize();
byte[] bytes = new byte[bodySize];
msgReceived.getBodyBuffer().readBytes( bytes );
System.out.println("message = " new String( bytes ) );
Looks like the STOMP message is being turned into a BytesMessage and not a TextMessage under the hood? Or something close to that. And some older discussion I found suggests that this does, in turn, relate to the presence or absence of the content-length header. Weird. But for now I'm happy I at least a path forward, even if it's not the perfect path forward.
CodePudding user response:
When the broker receives a STOMP message with no content-length
header then it encodes the body as a nullable SimpleString
using the writeNullableSimpleString
method on org.apache.activemq.artemis.api.core.ActiveMQBuffer
. Therefore, when you receive that message as a core message you need to read that data as a nullable SimpleString
using the readNullableSimpleString
method on org.apache.activemq.artemis.api.core.ActiveMQBuffer
. This is outlined in the documentation.
The reason this is working with your core producer and core consumer is they are using writeString
and readString
respectively.
Likewise, it doesn't work with your core producer and Python consumer because the core producer is using writeString
and the message is being converted into a STOMP MESSAGE
frame using readNullableSimpleString
which, of course, doesn't work.
Keep in mind that the body of a message is ultimately just an array of bytes. The broker has no way of knowing what kind of data is in the array. It could be human readable text or it could be binary data, and even if it was text it could be encoded any number of ways. In order for clients to exchange messages with each other they must use a common format. In this case, Java clients using the core API must use a nullable SimpleString
.
CodePudding user response:
OK, think I finally figured out what's going on here. There's a couple of different issues at play, and the interactions of them was making this confusing early on.
My current understanding:
stomp.py has a setting "auto_content_length" that controls whether or not the library automatically adds a content-length header to the message. And then the ActiveMQ broker creates different Message types from the STOMP message depending on whether or not that header is present. If the header is present, you get a BytesMessage. Otherwise you get a TextMessage. The auto_content_length is True by default, so when all this started I was getting BytesMessage's when I naively expected Text (hey, I'm sending a String, right?).
Now on the Java side, you have to use the API differently depending on which type of Message you get. If you get a BytesMessage, you call getBodyBuffer() and then read the bytes using readBytes(). I was originally getting BytesMessage and calling getBodyBuffer(), but mistakenly trying to use the various readString() methods, which was wrong.
Also on the Java side, if you get a TextMessage (what you get if you turn the content-length header off on the Python/STOMP side) then you have to do things a little differently. You call getDataBuffer() on the Message object, and then you use readNullableSimpleString() to read the underlying text.
So this will successfully read the message contents when there is no content-length header.
ActiveMQBuffer tempBuff = msgReceived.getDataBuffer();
System.out.println( "tempBuff: " tempBuff.readNullableSimpleString() );
So in the end, my problem was a combination of:
- Not knowing about that auto_content_length setting
- Not originally understanding how ActiveMQ uses the content-length header to change the Message type under the hood
- Not realizing that you have to call either getBodyBuffer() / readBytes(), or getDataBuffer() / readNullableSimpleString(), based on which type of Message you are handling.
With that understanding in hand, I can now read messages on the Java side with or without the content-length header being set on the Python side.