Home > front end >  C# How to test Received event handler - using RabbitMqClient
C# How to test Received event handler - using RabbitMqClient

Time:07-26

I am wanting to write some mock tests for the consumer.Received event handler within the Start method, the issues I am confused by is how to write a test for the Received block of code using Mock data.

How would I trigger this, I'm a bit rusty with this type of thing, so if anyone can explain what I need to do then that would be great. I was wondering if I need to extract it to another method or class but cant figure out how to hook this up.

public class DataOutConsumer : IDataOutConsumer { private readonly IRabbitMqConnectionDetails _mqConnectionDetails; private readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;

private readonly IDictionary<string, IMessageFormatter> _formatters;
private readonly ILogger _logger;
private readonly IMessageRepository _repository;

private IConnection _connection;
private IModel _channel;

public DataOutConsumer(
    IRabbitMqConnectionFactory rabbitMqConnectionFactory,
    IRabbitMqConnectionDetails mqConnectionDetails,
    IMessageRepository repository,
    IEnumerable<IMessageFormatter> formatters,
    ILogger logger)
{
    _rabbitMqConnectionFactory = rabbitMqConnectionFactory;
    _mqConnectionDetails = mqConnectionDetails;
    _repository = repository;
    _formatters = formatters.Select(f => new { f.Source, f }).ToDictionary(fs => fs.Source, fs => fs.f);
    _logger = logger;
}

/// <summary>
/// Start the consumer that will process messages on the DataOut queue
/// </summary>
public void Start()
{
    _connection = _rabbitMqConnectionFactory.CreateConnection();

    var channel = _connection.CreateModel(); 
    channel.BasicQos(0, 1, false);

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received  = (model, ea) =>
    {
        try
        {
            var properties = ea.BasicProperties;

            var headers = properties.Headers;

            var endpoint = headers.ReadHeaderValue("NServiceBus.OriginatingEndpoint");
            var formatter = _formatters[endpoint];
            var formattedMessage = formatter.Format(Encoding.UTF8.GetString(ea.Body.ToArray()));

            _logger.Information($"PreAddMessage: Originating Endpoint: {endpoint}, Enclosed Message Types: {headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes")}, Message Endpoint: DataOut, Payload: {formattedMessage.Payload}");

            var task = _repository.AddMessage(
                formattedMessage.Id,
                "DataOut",
                endpoint,
                headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes"),
                formattedMessage.Payload,
                formattedMessage.DateQueued);

            task.GetAwaiter().GetResult();

            // Acknowledge that the message has been processed
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        }
        catch (Exception e)
        {
            // Mark as Nack'ed
            // These seem to be sent to the error queue at least, also logs as an error
            consumer.Model.BasicNack(ea.DeliveryTag, false, false);
            _logger.Error(e.Message, e);
        }
    };
    channel.BasicConsume(queue: _mqConnectionDetails.Endpoint,
        autoAck: false,
        consumer: consumer);
}

/// <summary>
/// delete the channel and connection, don't believe this ever needs calling
/// </summary>
public void Stop()
{
    try
    {
        _channel.Close();
        _connection.Close();
    }
    catch (Exception e)
    {
        _logger.Error(e.Message, e);
    }
}

}

CodePudding user response:

I managed to extract it to a method as follows then call it with the following:

consumer.Received  = (model, ea) =>
{
    HandleMessage(model, ea, _channel);
};

The extracted method which can now be tested.

public void HandleMessage(object sender, BasicDeliverEventArgs ea, IModel channel)
{
    try
    {
        var properties = ea.BasicProperties;

        var headers = properties.Headers;

        var endpoint = headers.ReadHeaderValue("NServiceBus.OriginatingEndpoint");
        var formatter = _formatters[endpoint];
        var formattedMessage = formatter.Format(Encoding.UTF8.GetString(ea.Body.ToArray()));

        _logger.Information($"PreAddMessage: Originating Endpoint: {endpoint}, Enclosed Message Types: {headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes")}, Message Endpoint: DataOut, Payload: {formattedMessage.Payload}");

        var task = _repository.AddMessage(
            formattedMessage.Id,
            "DataOut",
            endpoint,
            headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes"),
            formattedMessage.Payload,
            formattedMessage.DateQueued);

        task.GetAwaiter().GetResult();

        // Acknowledge that the message has been processed
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    catch (Exception e)
    {
        // Mark as Nack'ed
        // These seem to be sent to the error queue at least, also logs as an error
        channel.BasicNack(ea.DeliveryTag, false, false);
        _logger.Error(e.Message, e);
    }
}
  • Related