Here's my scenario. In an IHostedService I need to subscribe to a GRPC channel and process messages using await foreach().
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);
var messages= client.SubscribeToMessages();
await foreach (var message in messages.ResponseStream.ReadAllAsync())
{
// do something with message
}
Since I'm subscribing to a stream there is a chance that I have missed messages that the server sent prior to my client subscribing. The MessengerClient has a method where I can get a snapshot of all messages for the current day:
var snapshotMessages = client.GetTodaysMessages();
I'd like to call the client.GetTodaysMessages() after subscribing to the stream that way there is an overlap and I am guaranteed to get everything.
I was thinking about starting a timer right before my await foreach(...) and calling client.GetTodaysMessages() in the timer's callback. What do you think of this approach?
One requirement is that if I get disconnected from the GRPC stream I need to resubscribe and call the client.GetTodaysMessages() again.
I'm open for suggestions that you may have
CodePudding user response:
I'm thinking you'd do something like this:
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);
var messages = client.SubscribeToMessages();
// Get the stream going so that you don't miss any
var messagesEnumerableTask = messages.ResponseStream.ReadAllAsync();
// Get today's messages
var snapshotMessages = await client.GetTodaysMessages(); // I'm asuming this is actually awaitable?
// Process today's messages
foreach (var message in snapshotMessages)
{
if (!IsProcessed(message))
{
Process(message);
}
}
// Process the stream
await foreach (var message in messagesEnumerableTask)
{
if (!IsProcessed(message))
{
Process(message);
}
}
I imagine you'll have some kind of way of determining if a message has already been seen (e.g. a method called IsProcessed(message)
).
CodePudding user response:
Microsoft has a document called Asynchronous programming with async and await which may help you here.
Essentially, instead of calling await
, you can instead assign your call to a Task
, which will run while you are able to perform other actions. You can then wait for the Task
to complete once you are ready. Task
s have a Result
property as well which contains the result from your awaited code.
There is a really helpful way to await multiple tasks by calling Task.WhenAny()
on an array of tasks you are waiting on. Here is a bit of code from Microsoft (from the Asynchronous programming guide I linked above) where they are able to create several breakfast related tasks, and then await
them all:
var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
Task finishedTask = await Task.WhenAny(breakfastTasks);
if (finishedTask == eggsTask)
{
Console.WriteLine("Eggs are ready");
}
else if (finishedTask == baconTask)
{
Console.WriteLine("Bacon is ready");
}
else if (finishedTask == toastTask)
{
Console.WriteLine("Toast is ready");
}
breakfastTasks.Remove(finishedTask);
}
You could try making your foreach a Task
like (This was written off the cuff, I didn't try to compile this, but it may help you):
var task = messages.ResponseStream.ReadAllAsync();
// do other work here, while the task runs
var snapshotMessages = client.GetTodaysMessages();
// and then deal with the result of the message stream
await task;
foreach(var message in task.Result){
// process message
}
CodePudding user response:
XY problem.
The solution to your problem is to not have your problem.
"Since I'm subscribing to a stream there is a chance that I have missed messages".
This is a FEATURE of gRPC.
Choose a technology where "Since I am using X, I have no chance of missing messages". You want to use a Message Queue instead of the Observable pattern. Look at
- MSMQ
- Rabbit MQ
- MQTT
- MassTransit
- Redis
- Tibco
- SimpleMQ
- etc
A message broker would solve your problem with zero code.