I wrote these code lines, but I couldn't figure out how I can print some values on console in consumer.receive
. This code is working because I checked some values on RabbitMQ CloudAMQP, but I cannot see any changes on console.
Problem is here:
Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) " received");
Full code:
// publisher
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqps://guest:guest@localhost");
using (IConnection connection = factory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
for (int i = 1; i <= 100; i )
{
byte [] bytemessage = Encoding.UTF8.GetBytes($"is - {i}");
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "kuyruk", routingKey: "", basicProperties: properties, body: bytemessage);
}
}
// Consumer
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqps://guest:guest@localhost");
using (IConnection connection = factory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
// Here consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
string queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "kuyruk", routingKey: "");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queueName, false, consumer);
consumer.Received = (sender, e) =>
{
Thread.Sleep(500);
Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) " received");
channel.BasicAck(e.DeliveryTag, false);
};
Console.Read();
}
CodePudding user response:
First you have to create the queue. When you publish a message after this point, it will be queued. If you publish the message first, the message will disappear because there is no queue to send the message to.
Another bug in your code is this: you are creating a consumer channel. You define the action to be taken when the message arrives. Then, when your code run into to the publishing process below, you exit the using block which is wrap your consumer channel. For this reason, consumer is closed and cannot consume message.
//consumer
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://guest:guest@localhost")
};
using (IConnection connection = factory.CreateConnection())
using (IModel consumeChannel = connection.CreateModel())
{
consumeChannel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
//#region Her Consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
string queueName = consumeChannel.QueueDeclare().QueueName;
consumeChannel.QueueBind(queue: queueName, exchange: "kuyruk", routingKey: "");
//#endregion
consumeChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(consumeChannel);
consumeChannel.BasicConsume(queueName, false, consumer);
consumer.Received = (sender, e) =>
{
Thread.Sleep(500);
Console.WriteLine(Encoding.UTF8.GetString(e.Body.ToArray()) " recieved");
consumeChannel.BasicAck(e.DeliveryTag, false);
};
//publisher
using (IModel publishChannel = connection.CreateModel())
{
publishChannel.ExchangeDeclare("kuyruk", type: ExchangeType.Fanout);
for (int i = 1; i <= 100; i )
{
var str = $"is - {i}";
byte[] byteMessage = Encoding.UTF8.GetBytes(str);
IBasicProperties properties = publishChannel.CreateBasicProperties();
properties.Persistent = true;
publishChannel.BasicPublish(exchange: "kuyruk", routingKey: "", basicProperties: properties, body: byteMessage);
}
}
Console.Read();
}