Home > Blockchain >  overlapping a task after await foreach() starts
overlapping a task after await foreach() starts

Time:05-11

Here's my scenario. In an IHostedService I need to subscribe to a GRPC channel and process messages using await foreach().

    var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new Messenger.MessengerClient(channel);

    var messages= client.SubscribeToMessages();

    await foreach (var message in messages.ResponseStream.ReadAllAsync())
    {
        // do something with message
    }

Since I'm subscribing to a stream there is a chance that I have missed messages that the server sent prior to my client subscribing. The MessengerClient has a method where I can get a snapshot of all messages for the current day:

  var snapshotMessages = client.GetTodaysMessages();

I'd like to call the client.GetTodaysMessages() after subscribing to the stream that way there is an overlap and I am guaranteed to get everything.

I was thinking about starting a timer right before my await foreach(...) and calling client.GetTodaysMessages() in the timer's callback. What do you think of this approach?

One requirement is that if I get disconnected from the GRPC stream I need to resubscribe and call the client.GetTodaysMessages() again.

I'm open for suggestions that you may have

CodePudding user response:

I'm thinking you'd do something like this:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);

var messages = client.SubscribeToMessages();

// Get the stream going so that you don't miss any
var messagesEnumerableTask = messages.ResponseStream.ReadAllAsync();

// Get today's messages
var snapshotMessages = await client.GetTodaysMessages(); // I'm asuming this is actually awaitable?

// Process today's messages
foreach (var message in snapshotMessages)
{
    if (!IsProcessed(message))
    {
        Process(message);
    } 
}

// Process the stream
await foreach (var message in messagesEnumerableTask)
{
    if (!IsProcessed(message))
    {
        Process(message);
    }
}

I imagine you'll have some kind of way of determining if a message has already been seen (e.g. a method called IsProcessed(message)).

CodePudding user response:

Microsoft has a document called Asynchronous programming with async and await which may help you here.

Essentially, instead of calling await, you can instead assign your call to a Task, which will run while you are able to perform other actions. You can then wait for the Task to complete once you are ready. Tasks have a Result property as well which contains the result from your awaited code.

There is a really helpful way to await multiple tasks by calling Task.WhenAny() on an array of tasks you are waiting on. Here is a bit of code from Microsoft (from the Asynchronous programming guide I linked above) where they are able to create several breakfast related tasks, and then await them all:

var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
    Task finishedTask = await Task.WhenAny(breakfastTasks);
    if (finishedTask == eggsTask)
    {
        Console.WriteLine("Eggs are ready");
    }
    else if (finishedTask == baconTask)
    {
        Console.WriteLine("Bacon is ready");
    }
    else if (finishedTask == toastTask)
    {
        Console.WriteLine("Toast is ready");
    }
    breakfastTasks.Remove(finishedTask);
}

You could try making your foreach a Task like (This was written off the cuff, I didn't try to compile this, but it may help you):

var task = messages.ResponseStream.ReadAllAsync();

// do other work here, while the task runs
var snapshotMessages = client.GetTodaysMessages();

// and then deal with the result of the message stream

await task;

foreach(var message in task.Result){ 

  // process message
}


CodePudding user response:

XY problem.

The solution to your problem is to not have your problem.

"Since I'm subscribing to a stream there is a chance that I have missed messages".

This is a FEATURE of gRPC.

Choose a technology where "Since I am using X, I have no chance of missing messages". You want to use a Message Queue instead of the Observable pattern. Look at

  • MSMQ
  • Rabbit MQ
  • MQTT
  • MassTransit
  • Redis
  • Tibco
  • SimpleMQ
  • etc

A message broker would solve your problem with zero code.

  • Related