I am creating Azure service bus listener using ServiceBusReceiverAsyncClient
in order to process messages from Azure service bus queue like this (ASB class):
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.build();
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusReceiverAsyncClient asyncClient = new ServiceBusClientBuilder()
.credential(credential)
.connectionString(connectionString)
.receiver()
.queueName(queueName)
.buildAsyncClient();
Disposable subscription = asyncClient.receiveMessages()
.subscribe(ASB::processMessage,
ASB::processError,
() -> System.out.println("Receiving complete."));
private static void processMessage(ServiceBusReceivedMessage message) {
System.out.println("Processing message.");
System.out.printf("Processed message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
message.getSequenceNumber(), message.getBody());
}
I have a tool to send few thousands messages to the service bus queue so I can test if it is asynchronously processing the messages. However, this is what I see in the log:
Processing message.
Processed message. Session: 3151fe4ef7734ea19658bcb3459d4cea, Sequence #: 2945. Contents: test 330
2022-12-14 18:51:11.379 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: a304b4fbf379436e8c0e95c5a2a8ec00, Sequence #: 2946. Contents: test 331
2022-12-14 18:51:11.545 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b2ca6cd5dc1a456d99eb469c2de37090, Sequence #: 2947. Contents: test 332
2022-12-14 18:51:11.638 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: 39decd757a544fb09aecf62f11deef3d, Sequence #: 2948. Contents: test 333
2022-12-14 18:51:11.765 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: 79bb1a6adaa542448075868ec190b7dd, Sequence #: 2949. Contents: test 334
2022-12-14 18:51:11.881 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: e891896a85a548968de904f3ee6f07a0, Sequence #: 2950. Contents: test 335
2022-12-14 18:51:12.008 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b0326bcc0ebd40b2b32d9e9ceb88d965, Sequence #: 2951. Contents: test 336
2022-12-14 18:51:12.121 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b55d635672c449ec924d2121bb8575b8, Sequence #: 2952. Contents: test 337
2022-12-14 18:51:12.268 INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
I am not sure if I can conclude this, but it seems like the code is processing messages in sequential order, one by one, meaning it is doing it synchronously.
I also tried using Thread.sleep(10000)
to see if ServiceBusReceiverAsyncClient
is spawning a new process for each messages, but it seems like it is processing on one thread.
Am I miss understanding anything? or is there a configuration like c# that I have to set in order for it to process concurrently? Any help would be great!
CodePudding user response:
I believe that you're conflating asynchronous with concurrent. The receiver returns a single stream of messages that you're subscribing to. Flux dispatches messages from this stream 1-by-1, waiting for your processMessage
function to return. The subscription is asynchronous, as it does not block the thread that called receiveMessages
.
What I believe that you may be looking for is the ServiceBusProcessorClient, which manages concurrency for you. It allows setting maxConcurrentCalls when building.
Java's ServiceBusProcessorClient
is equivalent to .NET's ServiceBusProcessor
type - which is "configuration in C#" that you referred to.