Home > Back-end >  Azure service bus using ServiceBusReceiverAsyncClient not consuming concurrently Java
Azure service bus using ServiceBusReceiverAsyncClient not consuming concurrently Java

Time:12-20

I am creating Azure service bus listener using ServiceBusReceiverAsyncClient in order to process messages from Azure service bus queue like this (ASB class):

DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
           .build();
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusReceiverAsyncClient asyncClient = new ServiceBusClientBuilder()
    .credential(credential)
    .connectionString(connectionString)
    .receiver()
    .queueName(queueName)
    .buildAsyncClient();

Disposable subscription = asyncClient.receiveMessages()
    .subscribe(ASB::processMessage,
               ASB::processError,
               () -> System.out.println("Receiving complete."));

private static void processMessage(ServiceBusReceivedMessage message) {
   System.out.println("Processing message.");
   System.out.printf("Processed message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
                      message.getSequenceNumber(), message.getBody());
    }

I have a tool to send few thousands messages to the service bus queue so I can test if it is asynchronously processing the messages. However, this is what I see in the log:

Processing message.
Processed message. Session: 3151fe4ef7734ea19658bcb3459d4cea, Sequence #: 2945. Contents: test 330
2022-12-14 18:51:11.379  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: a304b4fbf379436e8c0e95c5a2a8ec00, Sequence #: 2946. Contents: test 331
2022-12-14 18:51:11.545  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b2ca6cd5dc1a456d99eb469c2de37090, Sequence #: 2947. Contents: test 332
2022-12-14 18:51:11.638  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: 39decd757a544fb09aecf62f11deef3d, Sequence #: 2948. Contents: test 333
2022-12-14 18:51:11.765  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: 79bb1a6adaa542448075868ec190b7dd, Sequence #: 2949. Contents: test 334
2022-12-14 18:51:11.881  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: e891896a85a548968de904f3ee6f07a0, Sequence #: 2950. Contents: test 335
2022-12-14 18:51:12.008  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b0326bcc0ebd40b2b32d9e9ceb88d965, Sequence #: 2951. Contents: test 336
2022-12-14 18:51:12.121  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.
Processed message. Session: b55d635672c449ec924d2121bb8575b8, Sequence #: 2952. Contents: test 337
2022-12-14 18:51:12.268  INFO 91776 --- [oundedElastic-2] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":0,"requested":2,"linkCredits":0,"expectedTotalCredit":2,"queuedMessages":1,"creditsToAdd":1,"messageQueueSize":0}
Processing message.

I am not sure if I can conclude this, but it seems like the code is processing messages in sequential order, one by one, meaning it is doing it synchronously. I also tried using Thread.sleep(10000) to see if ServiceBusReceiverAsyncClient is spawning a new process for each messages, but it seems like it is processing on one thread.

Am I miss understanding anything? or is there a configuration like c# that I have to set in order for it to process concurrently? Any help would be great!

CodePudding user response:

I believe that you're conflating asynchronous with concurrent. The receiver returns a single stream of messages that you're subscribing to. Flux dispatches messages from this stream 1-by-1, waiting for your processMessage function to return. The subscription is asynchronous, as it does not block the thread that called receiveMessages.

What I believe that you may be looking for is the ServiceBusProcessorClient, which manages concurrency for you. It allows setting maxConcurrentCalls when building.

Java's ServiceBusProcessorClient is equivalent to .NET's ServiceBusProcessor type - which is "configuration in C#" that you referred to.

  • Related