Home > Software design >  How to optimize and speed up an asynchronous method with database calls
How to optimize and speed up an asynchronous method with database calls

Time:07-08

Hello everyone and thanks for helping me in advance. The following question might sound stupid and incorrect but I'm a beginner about it.

I have a method that gets some information from my database and sends it to an external database using a post call and a patch call in case the information has changed. I use EF Framework. In that db table there are at least 165k rows.

My question is the following: There is a way to optimize and speed up all the process? Maybe using multi threading, parallelism? I'm a beginner about it and I hope some of you help me understand.

The method is the following:

public async Task<List<dynamic>> SyncOrdersTaskAsync(int PageSize)
{
    int PageIndex = 0;

    if (PageSize <= 0) PageSize = 100;

    const string phrase = "The fields order, task_code must make a unique set";

    var sorting = new SortingCriteria {
        Properties = new string[] { "WkOpenDate ASC" } };

    List<dynamic> listTest = new List<dynamic>();

    using (var uow = this.Factory.BeginUnitOfWork())
    {
        var repo = uow.GetRepository<IWorkOrderRepository>();

        var count = await repo.CountAllAsync();

        count = 150;

        for (PageIndex = 0; PageIndex <= count / PageSize; PageIndex  )
        {
            var paging = new PagingCriteria
            {
                PageIndex = PageIndex,
                PageSize = PageSize
            };

            var rows = await repo.GetByCriteriaAsync(
                "new {WkID, CompanyID, JobNo, JobTaskNo ,WkNumber, WkYear,"  
                "WkYard,WkCustomerID,CuName,WkDivisionID,DvName,BusinessUnit,"  
                "BusinessUnitManagerID,BusinessUnitManager,WkWorkTypeID,WtName,"  
                "WkActivityID,WkActivityDescription,NoteDescrLavoro,WkWOManagerID,"  
                "ProjectManager,IDMaster,ProjectCoordinator,WkOpenDate,"  
                "WkDataChiusa,Prov,CodiceSito,CodiceOffice,CodiceLavorazione,"  
                "CodiceNodo,DescrizioneNodo,WkPrevisionalStartDate,WkRealStartDate,"  
                "WkPrevisionalEndDate,WkRealEndDate,NumeroOrdine,"  
                "WkPrevisionalLabourAmount,TotaleCosti,SumOvertimeHours,"  
                "SumTravelHours,SumNormalHours,WkProgressPercentage,Stato,CUP,CIG,"  
                "TotaleManodopera,TotalePrestazioni,TotaleNoli,TotaleMateriali,"  
                "SumAuxiliaryHours,TipoCommessa,TotaleOrdine, WkPreventivoData,"  
                "WkConsuntivoData,TotaleFatturato,AggregateTotaleFatturato,"  
                "AggregateTotalePrestazioni,Contract,CustomerOrderNumber,"  
                "XmeWBECode,LastUpdateDate,PreGestWkID,CommercialNotes,Mandant,"  
                "GammaProjectName,WkInventoryDate,WkCloseFlag,WkNote,"  
                "TotalRegisteredLabour,TotalRegisteredPerformances,"  
                "TotalRegisteredLeasings,TotalRegisteredMaterials,FlagFinalBalance,"  
                "FinalBalance,OrderDate,TotalOrderDivision,SearchDescription,"  
                "TotaleBefToBeApproved,TotaleBefToBeApprovedLeasings,"  
                "TotaleLabourToBeApproved,AggregateLevel, AggregateTotalLabour,"  
                "AggregateTotalLeasings,AggregateTotalMaterials,"  
                "AggregateTotalRegisteredLabour,"  
                "AggregateTotalRegisteredPerformances,"  
                "AggregateTotalRegisteredLeasings,"  
                "AggregateTotalRegisteredMaterials,"  
                "AggregateTotalCost,AggregateSumNormalHours,"  
                "AggregateSumAuxiliaryHours,AggregateSumRainHours,"  
                "AggregateSumTravelHours,AggregateSumOvertimeHours,"  
                "AggregateWkPrevisionalLabourAmount,AggregateFinalBalance,"  
                "AggregateTotalOrder,AggregateTotalOrderDivision,"  
                "AggregateTotalBefToBeApproved,"  
                "AggregateTotalBefToBeApprovedLeasings,"  
                "AggregateTotalLabourToBeApproved,TotalProduction,"  
                "AggregateTotalProduction,JobTaskDescription}", paging, sorting);

            String url = appSettings.Value.UrlV1   "order_tasks/";

            using (var httpClient = new HttpClient())
            {
                httpClient.DefaultRequestHeaders.Add("Authorization", "Token "  
                    await this.GetApiKey(true));
                if (rows.Count() > 0)
                {
                    foreach (var row in rows)
                    {
                        var testWork = (Model.WorkOrderCompleteInfo)Mapper
                            .MapWkOrdersCompleteInfo(row);
                        var orderIdDiv = await this.GetOrderForSyncing(httpClient,
                            testWork.JobNo);
                        var jsonTest = new JObject();
                        jsonTest["task_code"] = testWork.JobTaskNo;
                        jsonTest["description"] = testWork.JobTaskDescription;
                        jsonTest["order"] = orderIdDivitel.Id;
                        jsonTest["order_date"] = testWork.OrderDate.HasValue
                            ? testWork.OrderDate.Value.ToString("yyyy-MM-dd")
                            : string.IsNullOrEmpty(testWork.OrderDate.ToString())
                                ? "1970-01-01"
                                : testWork.OrderDate.ToString().Substring(0, 10);
                        jsonTest["progress"] = testWork.WkProgressPercentage;

                        var content = new StringContent(jsonTest.ToString(),
                            Encoding.UTF8, "application/json");
                        var result = await httpClient.PostAsync(url, content);
                        if (result.Content != null)
                        {
                            var responseContent = await result.Content
                                .ReadAsStringAsync();
                            bool alreadyExists = phrase.All(responseContent.Contains);

                            if (alreadyExists)
                            {
                                var taskCase = await GetTaskForSyncing(httpClient,
                                    testWork.JobTaskNo, orderIdDiv.Id.ToString());
                                var idCase = taskCase.Id;
                                String urlPatch = appSettings.Value.UrlV1  
                                    "order_tasks/"   idCase   "/";
                                bool isSame = taskCase.Equals(testWork
                                    .toSolOrderTask());
                                if (!isSame)
                                {
                                    var resultPatch = await httpClient.PatchAsync(
                                        urlPatch, content);
                                    if (resultPatch != null)
                                    {
                                        var responsePatchContent = await resultPatch
                                            .Content.ReadAsStringAsync();
                                        var jsonPatchContent = JsonConvert
                                            .DeserializeObject<dynamic>(
                                            responsePatchContent);
                                        listTest.Add(jsonPatchContent);
                                    }
                                }
                                else
                                {
                                    listTest.Add(taskCase.JobTaskNo_  
                                        " is already updated!");
                                }
                            }
                            else
                            {
                                var jsonContent = JsonConvert
                                    .DeserializeObject<dynamic>(responseContent);
                                listTest.Add(jsonContent);
                            }
                        }
                    }
                }
            }
        }

        return listTest;
    }
}

Maybe I need to apply parallelism in the for loop?

Again, really thanks to everyone in advance and I hope I was clear :)

CodePudding user response:

The most handy tool that is currently available for parallelizing asynchronous work is the Parallel.ForEachAsync method. It was introduced in .NET 6. Your code is quite complex though, and deciding where to put this loop is not obvious.

Ideally you would like to call the Parallel.ForEachAsync only once, so that it parallelizes your work with a single configurable degree of parallelism from start to finish. Generally you don't want to put this method inside an outer for/foreach loop, because then the degree of parallelism will fluctuate during the whole operation. But since your code is complex, I would go the easy way and do just that. I would replace this code:

foreach (var row in rows)
{
    //...
}

...with this:

ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(rows, options, async (row, _) =>
{
    //...
});

You have to make one more change. The List<T> is not thread safe, and so it will get corrupted if you call Add from multiple threads without synchronization. You can either add a lock (listTest) before each listTest.Add, or replace it with a concurrent collection. My suggestion is to do the later:

ConcurrentQueue<dynamic> listTest = new();
//...
listTest.Enqueue(jsonContent);
//...
return listTest.ToList();

After doing these changes, hopefully your code will still work correctly, and it will be running a bit faster. Then you'll have to experiment with the MaxDegreeOfParallelism setting, until you find the one that yields the optimal performance. Don't go crazy with large values like 100 or 1000. In most cases overparallelizing is harmful, and might yield worse performance than not parallelizing at all.

  • Related