Home > Mobile >  Multiple async tasks causing duplicates on SQL Server insert stored procedure
Multiple async tasks causing duplicates on SQL Server insert stored procedure

Time:08-12

I have a multithreaded application that loops through queues and grabs the data and sends this data to a stored procedure where it is inserted into my table. The problem is occasionally this data is inserted at the exact same time which causes a duplicate row to get inserted. Now these rows do have primary keys as id's however, all of the other columns are the exact same data.

Here is my loop that spawns up to 20 threads.

var task = new Task();

foreach(job in jobList)
{
    task = Task.Run(() => ProcessJobs(job)); 
}

Task.WaitAll(task);

Each thread reads its own separate queue, then I process each message and add it to a HashSet to make sure there are no duplicates

private async Task<string> ProcessJobs(Job job)
{
     var messageData = getMessageFromQueue(message);
     HashSet<UserInfo> list = new HashSet<UserInfo>();

     foreach(var message in messageData)
     {
         list.Add(BuildMessage(message));
     }

     InsertIntoDB(list);
}

public HashSet<UserInfo> BuildMessage(MessageData messageData)
{
     return new UserInfo
                {
                    UserName = messageData.UserName,
                    Address = messageData.Address,
                    AccountType = messageData.Campaign?.AccountType == "G" ? "Type1" :"Type2",
                    AccountNumber = messageData.AccountList !=  null ? messageData.AccountList[0].ToString() : string.Empty.
                }
}

public struct UserInfo
{
    public string UserName { get; set; }
    public string Address { get; set; }
    public string AccountType { get; set; }
    public string AccountNumber { get; set; }
}

Each message gets processed and sent to the database as a table-valued parameter to insert statement

public async Task<int> InsertIntoDB(HashSet<UserInfo> list)
{
    // First convert the hashset to a dataTable
    var dataTable = list.ToDatatable();

    // Convert to a TVP
    var params = new DynamicParameters();
    parameters.Add("@TVP_UserInfo", dataTable.AsTableValuedParameter("[dbo].[InsertUserInfo]"));

    using (var conn = new SqlConnection(ConfigurationManager.AppSettings["DatabaseConnection"]))
    {
        result = await conn.ExecuteAsync("InsertStoredProcedure", params, commanyType: CommandType.StoredProcedure);
    }
}

public DataTable ToDataTable<T>(this HashSet<T> iHashSet)
{
    DataTable dataTable = new DataTable();
    PropertyDescriptorCollection props = TypeDescriptor.GetProperties(typeof(T));

    for (int i = 0; i < props.Count; i  )
    {
        PropertyDescriptor propertyDescriptor = props[i];
        Type type = propertyDescriptor.PropertyType;

        if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
            type = Nullable.GetUnderlyingType(type);

        dataTable.Columns.Add(propertyDescriptor.Name, type);
    }

    object[] values = new object[props.Count];

    foreach (T iListItem in iHashSet)
    {
        for (int i = 0; i < values.Length; i  )
        {
             values[i] = props[i].GetValue(iListItem);
        }

        dataTable.Rows.Add(values);
    }

    return dataTable;
}

The insert statement reads the TVP and inserts

CREATE PROCEDURE [InsertStoredProcedure]
    (@TVP_UserInfo dbo.TVP_UserInfo READONLY)
AS
BEGIN
    DECLARE @currentDate datetime = CURRENT_TIMESTAMP

    INSERT INTO MyTable (UserName, Address, 
                         AccountType, AccountNumber, AccountDisplay,
                         CreatedDate)
        SELECT
            UserName, Address, 
            AccountType, AccountNumber, 
            CASE 
                WHEN AccountNumber IS NULL
                    THEN '' 
                    ELSE 'Anonymous' 
            END,
            @currentDate 
        FROM
            @TVP_UserInfo
END

Here is the UDT creation

CREATE TYPE [dbo].[TVP_UserInfo] 
    AS TABLE
       (
           UserName,
           Address, 
           AccountType,
           AccountNumber
       )

I get duplicates occasionally and I have no idea how or where they are coming from as each message should be unique because I am using a hashset.

I was thinking it is the multi threading that is causing it however, if I run just one single task I still get the duplicates sometimes. If you notice the created date is the exact same all the way down to the millisecond. The Id (primary key) is different but the remaining row data is an actual duplicate.

The results looks like this

ID UserName Address AccountNumber AccountDisplay CreatedDate
1 Joe JoesAddress1 123456 Anonymous 2022-08-01 01:45:52:352
1 Joe JoesAddress1 123456 Anonymous 2022-08-01 01:45:52:352

CodePudding user response:

Is UserName allowed to have duplicates in your database? If it cannot contain duplicates, I would suggest adding a unique index on that column (at least in development). That may assist you in catching the code that is causing duplicates.

CodePudding user response:

There are a few things I can see: Firstly you need to await all the tasks not just the last one.

var tasks = new List<Task>

foreach(job in jobList)
{
    tasks.add(Task.Run(() => ProcessJobs(job))); 
}

Task.WaitAll(tasks.ToArray());

Second I cannot see how the ProcessJobs block of code will work.

  • The message variable is out of scope
  • There is no await of the InsertIntoDB
  • There is no return value for string .

However I think the problem you having is that the code is going to have multiple threads accessing getMessageFromQueue. So is is and its dependencies re-entrant and thread safe? If all the work is sync you can use a lock object to throttle it down if you have other async work going on best to use SemaphoreSlim rather than lock but the lock will give you the idea.

Example of lock

private lockobj = new lockobj();

private async Task<string> ProcessJobs(Job job)
{
    lock (lockobj)
     {
        var messageData = getMessageFromQueue(message);
     }
    /// rest of your code .... and return value 
    
}
  • Related