I have the following method that uses Kafka to produce data into a topic:
public void Send(
ProducerMessage<TKey, TValue> producerMessage,
string topic,
Action<McFlowProducerResult<TKey, TValue>> callback = default)
{
try
{
var kafkaProducerMessage = new Message<string, string>();
// DeliveryHanlder logic is skipped?
_producer.Produce(
topic,
kafkaProducerMessage,
deliveryReport => DeliveryHandler(deliveryReport)); // TODO: How can I ensure the DeliveryHandler logic is executed without using async await Task?
}
catch (Exception ex)
{
// Some exception logic
}
}
The DeliveryHandler
logic is as follows:
// TODO: Execution never makes it into this function
private async Task DeliveryHandler(DeliveryReport<string, string> deliveryReport)
{
var producerResult = new ProducerResult<string, string>(deliveryReport);
if (!deliveryReport.Error.IsError)
{
_logger.LogError("Message Sent successfully to DLQ TOPIC");
return;
}
_logger.LogError("Unable to send the message to DLQ TOPIC: {0}. Error Reason :{1}",
deliveryReport.Topic, deliveryReport.Error.Reason);
if (deliveryReport.Error.Code == ErrorCode.NetworkException)
{
_logger.LogError("Sending message to DynamoDb");
await _fatalErrorHandler.HandleError(producerResult);
}
}
And I have the following unit test:
[Fact]
public void ValidateDeliveryHandlerIsInvoked()
{
var producerMessage = new ProducerMessage<string, string>(
"aKey",
"aValue",
new Headers(),
Timestamp.Default,
0
);
ProducerResult<string, string> callbackResult = null;
_mcFlowDlqProducer.Send(producerMessage, _topicName,
(mcFlowProducerResult) =>
{
callbackResult = mcFlowProducerResult;
});
Assert.NotEmpty(callbackResult.Topic);
}
}
Kafka's Send()
method receives 3 parameters: the topic
to produce to, the kafkaProducerMessage
which is the data to be sent, and an optional Action<DeliveryReport<TKey, TValue>> deliveryHandler
which allows the user to retrieve the results of a produce operation.
My problem is with that 3rd parameter -- the DeliveryHandler (which is an async method). When I run my above unit test, execution never makes it to the DeliveryHandler
because the call is not awaited.
I cannot modify my void Send(...)
method's signature because I need to have a Synchronous implementation of this method; so I cannot replace the void
keyword with async Task
.
How can I ensure that execution enters the DeliveryHandler
method so that the DeliveryHandler
logic gets executed without using async Task
?
I've tried modifying the call to the DeliveryHandler to:
DeliveryHandler(deliveryReport).GetAwaiter().GetResult()
But my debugger tells me that execution still never enters the DeliveryHandler
block.
CodePudding user response:
Try adding GetAwaiter().GetResult()
to the end, as in:
_producer.Produce(
topic,
kafkaProducerMessage,
deliveryReport => DeliveryHandler(deliveryReport).GetAwaiter().GetResult());
CodePudding user response:
I believe the best solution would be to create an overload for Send
:
public Task SendAsync(
ProducerMessage<TKey, TValue> producerMessage,
string topic,
Func<McFlowProducerResult<TKey, TValue>, Task> callback = default)
and then you can call SendAsync
when you have an asynchronous callback.