QUESTION TITILE UPDATED from original. The real problem wasn't really a multithreading issue, it was how I was batching the work, and things were being repeated.
I am getting the wrong result when using the = operator in a loop.
I have created a simple script that reproduces the problem I have been having. The array should be filled with array[i,j] = i j. The first time it goes through it just does it using the assignment the second time trying to use the = operator. There is also array_2, array_3, and array_4 to show the correct result using two separate arrays added together at the end.
Can anyone explain why this is happening / what the correct way to do this is?
using Base.Threads
function function_1(cartesian_index)
i = cartesian_index[1]
j = cartesian_index[2]
return (i j)/2
end
function main()
array_1 = zeros(Float64, 15, 15)
array_2 = zeros(Float64, 15, 15)
array_3 = zeros(Float64, 15, 15)
cartesian_indecies = CartesianIndices(array_1)
number_of_indecies = length(cartesian_indecies)
n_threads = Threads.nthreads()
batch_size = ceil(Int, number_of_indecies / n_threads)
cartesian_indicies = CartesianIndices(array_1)
@sync for batch_index in 1:batch_size:number_of_indecies
Threads.@spawn begin
for view_index in batch_index:min(number_of_indecies, batch_index batch_size)
cartesian_index = cartesian_indecies[view_index]
array_1[cartesian_index] = function_1(cartesian_index)
array_2[cartesian_index] = function_1(cartesian_index)
end
end
end
for index in CartesianIndices(array_1)
if array_1[index] != (index[1] index[2])/2
println("Error part 1 at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
@sync for batch_index in 1:batch_size:number_of_indecies
Threads.@spawn begin
for view_index in batch_index:min(number_of_indecies, batch_index batch_size)
cartesian_index = cartesian_indecies[view_index]
array_1[cartesian_index] = function_1(cartesian_index)
array_3[cartesian_index] = function_1(cartesian_index)
end
end
end
for index in CartesianIndices(array_1)
if array_1[index] != index[1] index[2]
println("Error at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
array_4 = array_2 array_3
for index in CartesianIndices(array_4)
if array_4[index] != index[1] index[2]
println("Array 4 Error at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
end
main()
this works fine when running with one thread.
But when running as:
julia --threads 3 ./parallel_issues_test.jl
The output of this:
Error at index CartesianIndex(1, 6): 10.5 != 7
Error at index CartesianIndex(1, 11): 18.0 != 12
CodePudding user response:
Because your partitioning has bug:
test = []
for batch_index in 1:batch_size:number_of_indecies
for view_index in batch_index:min(number_of_indecies, batch_index batch_size)
cartesian_index = cartesian_indecies[view_index]
push!(test, cartesian_index)
end
end
@show length(test), length(unique(test))
# shows
(length(test), length(unique(test))) = (227, 225)
there are a few clues:
=
works because you don't rely on previous value- you have 2 errors and this shows you have exactly 2 extra iteration
- if it's something to do with multi-threading (race condition), the error location or value would probably change from run to run
CodePudding user response:
A fixed version from @jling 's answer.
The key lines are the top-level loops:
@sync for batch_index in 1:batch_size 1:number_of_indecies
using Base.Threads
function function_1(cartesian_index)
i = cartesian_index[1]
j = cartesian_index[2]
return (i j)/2
end
function main()
array_1 = zeros(Float64, 15, 15)
array_2 = zeros(Float64, 15, 15)
array_3 = zeros(Float64, 15, 15)
cartesian_indecies = CartesianIndices(array_1)
number_of_indecies = length(cartesian_indecies)
n_threads = Threads.nthreads()
batch_size = ceil(Int, number_of_indecies / n_threads)
@sync for batch_index in 1:batch_size 1:number_of_indecies
Threads.@spawn begin
for view_index in batch_index:min(number_of_indecies, batch_index batch_size)
cartesian_index = cartesian_indecies[view_index]
array_1[cartesian_index] = function_1(cartesian_index)
array_2[cartesian_index] = function_1(cartesian_index)
end
end
end
for index in CartesianIndices(array_1)
if array_1[index] != (index[1] index[2])/2
println("Error part 1 at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
@sync for batch_index in 1:batch_size 1:number_of_indecies
Threads.@spawn begin
for view_index in batch_index:min(number_of_indecies, batch_index batch_size)
cartesian_index = cartesian_indecies[view_index]
array_1[cartesian_index] = function_1(cartesian_index)
array_3[cartesian_index] = function_1(cartesian_index)
end
end
end
for index in CartesianIndices(array_1)
if array_1[index] != index[1] index[2]
println("Error at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
array_4 = array_2 array_3
for index in CartesianIndices(array_4)
if array_4[index] != index[1] index[2]
println("Array 4 Error at index $index: $(array_1[index]) != $(index[1] index[2])")
end
end
println("number_of_indecies = $number_of_indecies")
println("n_threads = $n_threads")
println("batch_size = $batch_size")
test = []
@sync for batch_index in 1:batch_size 1:number_of_indecies
Threads.@spawn begin
#print thread number
end_index = min(number_of_indecies, batch_index batch_size)
println("Thread $(Threads.threadid()) $batch_index:$end_index")
for view_index in batch_index:end_index
cartesian_index = cartesian_indecies[view_index]
push!(test, cartesian_index)
end
end
end
@show length(test), length(unique(test))
end
main()