Home > other >  How to run Asynchronous Logic Synchronously
How to run Asynchronous Logic Synchronously

Time:12-06

I have the following method that uses Kafka to produce data into a topic:

public void Send(
    ProducerMessage<TKey, TValue> producerMessage, 
    string topic, 
    Action<McFlowProducerResult<TKey, TValue>> callback = default)
    {
       try
       {
        var kafkaProducerMessage = new Message<string, string>();
        
        // DeliveryHanlder logic is skipped?
        _producer.Produce(
            topic,
            kafkaProducerMessage,
            deliveryReport => DeliveryHandler(deliveryReport)); // TODO: How can I ensure the DeliveryHandler logic is executed without using async await Task?
       }
       catch (Exception ex)
       {
           // Some exception logic
       }
}

The DeliveryHandler logic is as follows:

        // TODO: Execution never makes it into this function
        private async Task DeliveryHandler(DeliveryReport<string, string> deliveryReport)
        {
            var producerResult = new ProducerResult<string, string>(deliveryReport);
            
            if (!deliveryReport.Error.IsError)
            {
                _logger.LogError("Message Sent successfully to DLQ TOPIC");
                return;
            }

            _logger.LogError("Unable to send the message to DLQ TOPIC: {0}. Error Reason :{1}", 
                deliveryReport.Topic, deliveryReport.Error.Reason);
            
            if (deliveryReport.Error.Code == ErrorCode.NetworkException)
            {
                _logger.LogError("Sending message to DynamoDb");
                
                await _fatalErrorHandler.HandleError(producerResult);
            }
        }

And I have the following unit test:

[Fact]
public void ValidateDeliveryHandlerIsInvoked()
{
    var producerMessage = new ProducerMessage<string, string>(
        "aKey",
        "aValue",
        new Headers(),
        Timestamp.Default,
        0
    );
    
    ProducerResult<string, string> callbackResult = null;
    
    _mcFlowDlqProducer.Send(producerMessage, _topicName,
        (mcFlowProducerResult) =>
        {
            callbackResult = mcFlowProducerResult;
        });
    
    Assert.NotEmpty(callbackResult.Topic);
}
}

Kafka's Send() method receives 3 parameters: the topic to produce to, the kafkaProducerMessage which is the data to be sent, and an optional Action<DeliveryReport<TKey, TValue>> deliveryHandler which allows the user to retrieve the results of a produce operation.

My problem is with that 3rd parameter -- the DeliveryHandler (which is an async method). When I run my above unit test, execution never makes it to the DeliveryHandler because the call is not awaited.

I cannot modify my void Send(...) method's signature because I need to have a Synchronous implementation of this method; so I cannot replace the void keyword with async Task.

How can I ensure that execution enters the DeliveryHandler method so that the DeliveryHandler logic gets executed without using async Task?

I've tried modifying the call to the DeliveryHandler to:

DeliveryHandler(deliveryReport).GetAwaiter().GetResult()

But my debugger tells me that execution still never enters the DeliveryHandler block.

CodePudding user response:

Try adding GetAwaiter().GetResult() to the end, as in:

_producer.Produce(
            topic,
            kafkaProducerMessage,
            deliveryReport => DeliveryHandler(deliveryReport).GetAwaiter().GetResult());
      

CodePudding user response:

I believe the best solution would be to create an overload for Send:

public Task SendAsync(
    ProducerMessage<TKey, TValue> producerMessage, 
    string topic, 
    Func<McFlowProducerResult<TKey, TValue>, Task> callback = default)

and then you can call SendAsync when you have an asynchronous callback.

  • Related