Home > front end >  What is wrong with my batching when using @sync @spawn loop on multiple threads, odd behavior when u
What is wrong with my batching when using @sync @spawn loop on multiple threads, odd behavior when u

Time:01-28

QUESTION TITILE UPDATED from original. The real problem wasn't really a multithreading issue, it was how I was batching the work, and things were being repeated.


I am getting the wrong result when using the = operator in a loop.

I have created a simple script that reproduces the problem I have been having. The array should be filled with array[i,j] = i j. The first time it goes through it just does it using the assignment the second time trying to use the = operator. There is also array_2, array_3, and array_4 to show the correct result using two separate arrays added together at the end.

Can anyone explain why this is happening / what the correct way to do this is?

using Base.Threads

function function_1(cartesian_index)
    i = cartesian_index[1]
    j = cartesian_index[2]

    return (i   j)/2
end

function main()
    array_1 = zeros(Float64, 15, 15)
    array_2 = zeros(Float64, 15, 15)
    array_3 = zeros(Float64, 15, 15)

    cartesian_indecies = CartesianIndices(array_1)
    number_of_indecies = length(cartesian_indecies)
    n_threads = Threads.nthreads()
    batch_size = ceil(Int, number_of_indecies / n_threads)

    cartesian_indicies = CartesianIndices(array_1)

    @sync for batch_index in 1:batch_size:number_of_indecies
        Threads.@spawn begin
          for view_index in batch_index:min(number_of_indecies, batch_index   batch_size)
            cartesian_index = cartesian_indecies[view_index]
            array_1[cartesian_index] = function_1(cartesian_index)
            array_2[cartesian_index] = function_1(cartesian_index)
          end
        end
    end

    for index in CartesianIndices(array_1)
        if array_1[index] != (index[1]   index[2])/2
            println("Error part 1 at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

    @sync for batch_index in 1:batch_size:number_of_indecies
        Threads.@spawn begin
          for view_index in batch_index:min(number_of_indecies, batch_index   batch_size)
            cartesian_index = cartesian_indecies[view_index]
            array_1[cartesian_index]  = function_1(cartesian_index)
            array_3[cartesian_index] = function_1(cartesian_index)
          end
        end
    end

    for index in CartesianIndices(array_1)
        if array_1[index] != index[1]   index[2]
            println("Error at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

    array_4 = array_2   array_3
    for index in CartesianIndices(array_4)
        if array_4[index] != index[1]   index[2]
            println("Array 4 Error at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

end

main()

this works fine when running with one thread.

But when running as:

 julia --threads 3 ./parallel_issues_test.jl

The output of this:

Error at index CartesianIndex(1, 6): 10.5 != 7
Error at index CartesianIndex(1, 11): 18.0 != 12

CodePudding user response:

Because your partitioning has bug:

    test = []
    for batch_index in 1:batch_size:number_of_indecies
          for view_index in batch_index:min(number_of_indecies, batch_index   batch_size)
            cartesian_index = cartesian_indecies[view_index]
            push!(test, cartesian_index)
          end
    end
    @show length(test), length(unique(test))

# shows
(length(test), length(unique(test))) = (227, 225)

there are a few clues:

  1. = works because you don't rely on previous value
  2. you have 2 errors and this shows you have exactly 2 extra iteration
  3. if it's something to do with multi-threading (race condition), the error location or value would probably change from run to run

CodePudding user response:

A fixed version from @jling 's answer.

The key lines are the top-level loops:

@sync for batch_index in 1:batch_size 1:number_of_indecies

using Base.Threads

function function_1(cartesian_index)
    i = cartesian_index[1]
    j = cartesian_index[2]

    return (i   j)/2
end

function main()
    array_1 = zeros(Float64, 15, 15)
    array_2 = zeros(Float64, 15, 15)
    array_3 = zeros(Float64, 15, 15)

    cartesian_indecies = CartesianIndices(array_1)
    number_of_indecies = length(cartesian_indecies)
    n_threads = Threads.nthreads()
    batch_size = ceil(Int, number_of_indecies / n_threads)

    @sync for batch_index in 1:batch_size 1:number_of_indecies
        Threads.@spawn begin
          for view_index in batch_index:min(number_of_indecies, batch_index   batch_size)
            cartesian_index = cartesian_indecies[view_index]
            array_1[cartesian_index] = function_1(cartesian_index)
            array_2[cartesian_index] = function_1(cartesian_index)
          end
        end
    end

    for index in CartesianIndices(array_1)
        if array_1[index] != (index[1]   index[2])/2
            println("Error part 1 at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

    @sync for batch_index in 1:batch_size 1:number_of_indecies
        Threads.@spawn begin
          for view_index in batch_index:min(number_of_indecies, batch_index   batch_size)
            cartesian_index = cartesian_indecies[view_index]
            array_1[cartesian_index]  = function_1(cartesian_index)
            array_3[cartesian_index] = function_1(cartesian_index)
          end
        end
    end

    for index in CartesianIndices(array_1)
        if array_1[index] != index[1]   index[2]
            println("Error at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

    array_4 = array_2   array_3
    for index in CartesianIndices(array_4)
        if array_4[index] != index[1]   index[2]
            println("Array 4 Error at index $index: $(array_1[index]) != $(index[1]   index[2])")
        end
    end

    println("number_of_indecies = $number_of_indecies")
    println("n_threads = $n_threads")
    println("batch_size = $batch_size")
    test = []
    @sync for batch_index in 1:batch_size 1:number_of_indecies
        Threads.@spawn begin
          #print thread number
          end_index = min(number_of_indecies, batch_index   batch_size)
          println("Thread $(Threads.threadid()) $batch_index:$end_index")

          for view_index in batch_index:end_index
            cartesian_index = cartesian_indecies[view_index]
            push!(test, cartesian_index)
          end
        end
    end
    @show length(test), length(unique(test))

end

main()
  • Related