I am wanting to write some mock tests for the consumer.Received event handler within the Start method, the issues I am confused by is how to write a test for the Received block of code using Mock data.
How would I trigger this, I'm a bit rusty with this type of thing, so if anyone can explain what I need to do then that would be great. I was wondering if I need to extract it to another method or class but cant figure out how to hook this up.
public class DataOutConsumer : IDataOutConsumer { private readonly IRabbitMqConnectionDetails _mqConnectionDetails; private readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory;
private readonly IDictionary<string, IMessageFormatter> _formatters;
private readonly ILogger _logger;
private readonly IMessageRepository _repository;
private IConnection _connection;
private IModel _channel;
public DataOutConsumer(
IRabbitMqConnectionFactory rabbitMqConnectionFactory,
IRabbitMqConnectionDetails mqConnectionDetails,
IMessageRepository repository,
IEnumerable<IMessageFormatter> formatters,
ILogger logger)
{
_rabbitMqConnectionFactory = rabbitMqConnectionFactory;
_mqConnectionDetails = mqConnectionDetails;
_repository = repository;
_formatters = formatters.Select(f => new { f.Source, f }).ToDictionary(fs => fs.Source, fs => fs.f);
_logger = logger;
}
/// <summary>
/// Start the consumer that will process messages on the DataOut queue
/// </summary>
public void Start()
{
_connection = _rabbitMqConnectionFactory.CreateConnection();
var channel = _connection.CreateModel();
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received = (model, ea) =>
{
try
{
var properties = ea.BasicProperties;
var headers = properties.Headers;
var endpoint = headers.ReadHeaderValue("NServiceBus.OriginatingEndpoint");
var formatter = _formatters[endpoint];
var formattedMessage = formatter.Format(Encoding.UTF8.GetString(ea.Body.ToArray()));
_logger.Information($"PreAddMessage: Originating Endpoint: {endpoint}, Enclosed Message Types: {headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes")}, Message Endpoint: DataOut, Payload: {formattedMessage.Payload}");
var task = _repository.AddMessage(
formattedMessage.Id,
"DataOut",
endpoint,
headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes"),
formattedMessage.Payload,
formattedMessage.DateQueued);
task.GetAwaiter().GetResult();
// Acknowledge that the message has been processed
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception e)
{
// Mark as Nack'ed
// These seem to be sent to the error queue at least, also logs as an error
consumer.Model.BasicNack(ea.DeliveryTag, false, false);
_logger.Error(e.Message, e);
}
};
channel.BasicConsume(queue: _mqConnectionDetails.Endpoint,
autoAck: false,
consumer: consumer);
}
/// <summary>
/// delete the channel and connection, don't believe this ever needs calling
/// </summary>
public void Stop()
{
try
{
_channel.Close();
_connection.Close();
}
catch (Exception e)
{
_logger.Error(e.Message, e);
}
}
}
CodePudding user response:
I managed to extract it to a method as follows then call it with the following:
consumer.Received = (model, ea) =>
{
HandleMessage(model, ea, _channel);
};
The extracted method which can now be tested.
public void HandleMessage(object sender, BasicDeliverEventArgs ea, IModel channel)
{
try
{
var properties = ea.BasicProperties;
var headers = properties.Headers;
var endpoint = headers.ReadHeaderValue("NServiceBus.OriginatingEndpoint");
var formatter = _formatters[endpoint];
var formattedMessage = formatter.Format(Encoding.UTF8.GetString(ea.Body.ToArray()));
_logger.Information($"PreAddMessage: Originating Endpoint: {endpoint}, Enclosed Message Types: {headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes")}, Message Endpoint: DataOut, Payload: {formattedMessage.Payload}");
var task = _repository.AddMessage(
formattedMessage.Id,
"DataOut",
endpoint,
headers.ReadHeaderValue("NServiceBus.EnclosedMessageTypes"),
formattedMessage.Payload,
formattedMessage.DateQueued);
task.GetAwaiter().GetResult();
// Acknowledge that the message has been processed
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception e)
{
// Mark as Nack'ed
// These seem to be sent to the error queue at least, also logs as an error
channel.BasicNack(ea.DeliveryTag, false, false);
_logger.Error(e.Message, e);
}
}