I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.
I want multiple consumer. And each of them should get all the messages.
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();
var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i );
await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});
var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});
var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});
cts.CancelAfter(TimeSpan.FromSeconds(5));
Console.ReadLine();
CodePudding user response:
A single Channel<T>
cannot broadcast messages to multiple consumers. Every time a message is read from the channel, the message is consumed, and no other consumer is going to get it. If you want to broadcast all messages to all consumers, you'll have to create one dedicated Channel<T>
per consumer.
You might find this question interesting: Factory for IAsyncEnumerable or IAsyncEnumerator. It shows various ways to implement a source/controller for an IAsyncEnumerable<T>
sequence, that include channels and Rx subjects.