I have a multithreaded application that loops through queues and grabs the data and sends this data to a stored procedure where it is inserted into my table. The problem is occasionally this data is inserted at the exact same time which causes a duplicate row to get inserted. Now these rows do have primary keys as id's however, all of the other columns are the exact same data.
Here is my loop that spawns up to 20 threads.
var task = new Task();
foreach(job in jobList)
{
task = Task.Run(() => ProcessJobs(job));
}
Task.WaitAll(task);
Each thread reads its own separate queue, then I process each message and add it to a HashSet to make sure there are no duplicates
private async Task<string> ProcessJobs(Job job)
{
var messageData = getMessageFromQueue(message);
HashSet<UserInfo> list = new HashSet<UserInfo>();
foreach(var message in messageData)
{
list.Add(BuildMessage(message));
}
InsertIntoDB(list);
}
public HashSet<UserInfo> BuildMessage(MessageData messageData)
{
return new UserInfo
{
UserName = messageData.UserName,
Address = messageData.Address,
AccountType = messageData.Campaign?.AccountType == "G" ? "Type1" :"Type2",
AccountNumber = messageData.AccountList != null ? messageData.AccountList[0].ToString() : string.Empty.
}
}
public struct UserInfo
{
public string UserName { get; set; }
public string Address { get; set; }
public string AccountType { get; set; }
public string AccountNumber { get; set; }
}
Each message gets processed and sent to the database as a table-valued parameter to insert statement
public async Task<int> InsertIntoDB(HashSet<UserInfo> list)
{
// First convert the hashset to a dataTable
var dataTable = list.ToDatatable();
// Convert to a TVP
var params = new DynamicParameters();
parameters.Add("@TVP_UserInfo", dataTable.AsTableValuedParameter("[dbo].[InsertUserInfo]"));
using (var conn = new SqlConnection(ConfigurationManager.AppSettings["DatabaseConnection"]))
{
result = await conn.ExecuteAsync("InsertStoredProcedure", params, commanyType: CommandType.StoredProcedure);
}
}
public DataTable ToDataTable<T>(this HashSet<T> iHashSet)
{
DataTable dataTable = new DataTable();
PropertyDescriptorCollection props = TypeDescriptor.GetProperties(typeof(T));
for (int i = 0; i < props.Count; i )
{
PropertyDescriptor propertyDescriptor = props[i];
Type type = propertyDescriptor.PropertyType;
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
type = Nullable.GetUnderlyingType(type);
dataTable.Columns.Add(propertyDescriptor.Name, type);
}
object[] values = new object[props.Count];
foreach (T iListItem in iHashSet)
{
for (int i = 0; i < values.Length; i )
{
values[i] = props[i].GetValue(iListItem);
}
dataTable.Rows.Add(values);
}
return dataTable;
}
The insert statement reads the TVP and inserts
CREATE PROCEDURE [InsertStoredProcedure]
(@TVP_UserInfo dbo.TVP_UserInfo READONLY)
AS
BEGIN
DECLARE @currentDate datetime = CURRENT_TIMESTAMP
INSERT INTO MyTable (UserName, Address,
AccountType, AccountNumber, AccountDisplay,
CreatedDate)
SELECT
UserName, Address,
AccountType, AccountNumber,
CASE
WHEN AccountNumber IS NULL
THEN ''
ELSE 'Anonymous'
END,
@currentDate
FROM
@TVP_UserInfo
END
Here is the UDT creation
CREATE TYPE [dbo].[TVP_UserInfo]
AS TABLE
(
UserName,
Address,
AccountType,
AccountNumber
)
I get duplicates occasionally and I have no idea how or where they are coming from as each message should be unique because I am using a hashset.
I was thinking it is the multi threading that is causing it however, if I run just one single task I still get the duplicates sometimes. If you notice the created date is the exact same all the way down to the millisecond. The Id
(primary key) is different but the remaining row data is an actual duplicate.
The results looks like this
ID | UserName | Address | AccountNumber | AccountDisplay | CreatedDate |
---|---|---|---|---|---|
1 | Joe | JoesAddress1 | 123456 | Anonymous | 2022-08-01 01:45:52:352 |
1 | Joe | JoesAddress1 | 123456 | Anonymous | 2022-08-01 01:45:52:352 |
CodePudding user response:
Is UserName allowed to have duplicates in your database? If it cannot contain duplicates, I would suggest adding a unique index on that column (at least in development). That may assist you in catching the code that is causing duplicates.
CodePudding user response:
There are a few things I can see: Firstly you need to await all the tasks not just the last one.
var tasks = new List<Task>
foreach(job in jobList)
{
tasks.add(Task.Run(() => ProcessJobs(job)));
}
Task.WaitAll(tasks.ToArray());
Second I cannot see how the ProcessJobs block of code will work.
- The message variable is out of scope
- There is no await of the InsertIntoDB
- There is no return value for string .
However I think the problem you having is that the code is going to have multiple threads accessing getMessageFromQueue. So is is and its dependencies re-entrant and thread safe? If all the work is sync you can use a lock object to throttle it down if you have other async work going on best to use SemaphoreSlim rather than lock but the lock will give you the idea.
Example of lock
private lockobj = new lockobj();
private async Task<string> ProcessJobs(Job job)
{
lock (lockobj)
{
var messageData = getMessageFromQueue(message);
}
/// rest of your code .... and return value
}