Home > OS >  Durable Orchestrator gets stuck after Task.WhenAll
Durable Orchestrator gets stuck after Task.WhenAll

Time:05-10

I have an orchestrator with 4 activites:

  1. PromoDataFromActpm - single activity that downloads some data from API.
  2. PromoDataExport - single activity that sends data from activity #1 to Azure Storage
  3. SavePromoProductFromACPSActivity - Parallel activities, that for each item from activity #1 make a call to some API and downloads some data
  4. TableToBlobPromoProductActivity - Parallel activities that write items from activity #3 to blob storage

For activities #3 each item in collection is 1 activity call, for activity #4 it is batched in collection of 50 items per 1 activity call, which are being awaited by Task.WhenAll.

Everything is working fine on local environment, but on azure, orchestrator stops processing after activity #3 Task.WhenAll for some reason. I get many Requests for SavePromoProductFromACPSActivity in logs, as it should be, but after some time they stop and TableToBlobPromoProductActivity activity is never called. I just get occasionally "Executing XYZ orchestrator", and after few minutes "Executed XYZ orchestrator", with no activity calls between those messages.

I've been fighting with it for a while but without success. Any ideas?

Here is the code:

            var orchestrationId = context.InstanceId.Replace(":","");

            var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
            var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);

            var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
            var acpsPromos = new List<PromotedProductExportModel>();
            foreach (var promo in promoData)
            {
                acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
            }
            await Task.WhenAll(acpsTasks);
            acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));

            var promoDataBatched = acpsPromos.Batch(50);
            var tasks = new List<Task>();
            foreach(var arr in promoDataBatched)
            {
                var promoBlob = new PromotionExportSubModel
                {
                    PromotionExportModel = arr.ToArray(),
                    blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
                    orchestrationId = orchestrationId
                };
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
            }
            await Task.WhenAll(tasks);

CodePudding user response:

Since in the comments you mentioned you are launching 3000 activities in parallel in step 3, I have one idea what could be happening. Remember that each activity will add rows to the history table that all need to be loaded on each replay of the orchestrator (which occurs after each activity returns). So the load time will keep going up and up, as well as memory usage.

A typical solution that I've used for this is to split the work into sub-orchestrators. Break the data into batches of 100 for example, launch a sub-orchestrator for each of them to run in parallel. Then in that sub-orchestrator do the actual step 3 activities. This way that sub-orchestrator's instances' history table rows are capped at the required ones for 100 activities and the main orchestrator only gets ~30 results. You can do a similar thing for step 4.

CodePudding user response:

My guess is that your attempt to parallelize those tasks is to blame here. The easy way to check is to just await them as you produce them:

foreach (var promo in promoData)
{
    await context.CallActivityAsync<List<PromotedProductExportModel>>( 
             FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo);
 }

if that works your code most likely is deadlocking it self when continuation happens to select main thread for execution and main thread is waiting for tasks to finish so they never finish. So if you want that parallel execution you can wrap your tasks in Task.Run call:

foreach (var promo in promoData)
{
   acpsTasks.Add(
     Task.Run(async ()=> await context.CallActivityAsync<List<PromotedProductExportModel>>(
     FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo)));
}

but if that code is just calling an endpoint with not that much of data processing it might be detrimental to do so.

  • Related