Home > Blockchain >  How to declare a shared DataFrame in Julia for parallel computing
How to declare a shared DataFrame in Julia for parallel computing

Time:07-07

I have a large simulation on a DataFrame df which I am trying to parallelize and save the results of the simulations in a DataFrame called simulation_results.

The parallelization loop is working just fine. The problem is that if I were to store the results in an array I would declare it as a SharedArray before the loop. I don't know how to declare simulation_results as a "shared DataFrame" which is available everywhere to all processors and can be modified.

A code snippet is as follows:

addprocs(length(Sys.cpu_info()))

@everywhere begin
  using <required packages>

  df = CSV.read("/path/data.csv", DataFrame)

  simulation_results = similar(df, 0) #I need to declare this as shared and modifiable by all processors 
  
  nsims = 100000

end


@sync @distributed for sim in 1:nsims
    nsim_result = similar(df, 0)
    <the code which for one simulation stores the results in nsim_result >
    append!(simulation_results, nsim_result)
end

The problem is that since simulation_results is not declared to be shared and modifiable by processors, after the loop runs, it produces basically an empty DataFrame as was coded in @everywhere simulation_results = similar(df, 0).

Would really appreciate any help on this! Thanks!

CodePudding user response:

The pattern for distributed computing in Julia is much simpler than what you are trying to do.

Your code should look more or less like this:

df = CSV.read("/path/data.csv", DataFrame)

@everywhere using <required packages>


simulation_results = @distributed (append!) for sim in 1:nsims
    <the code which for one simulation stores the results in nsim_result >
    nsim_result
end

Note you do not need to load df at every process within the Julia cluster since @distributed will make sure it is readable. You do not need to @sync neither because in my code you would use the aggregator function (append!).

A minimal working example (run with addprocs(4)):

@everywhere using Distributed, DataFrames
df = DataFrame(a=1:5,b=rand())

and now the result:

julia> @distributed (append!) for i in 2:5
           DataFrame(bsum=sum(df.b[1:myid()]),c=myid())
       end
4×2 DataFrame
 Row │ bsum      c
     │ Float64   Int64
─────┼─────────────────
   1 │ 0.518127      2
   2 │ 0.777191      3
   3 │ 1.03625       4
   4 │ 1.29532       5

CodePudding user response:

As long as your dataframe df is numeric in the entries you process, you can pass it back and forth as a matrix:

mynames = names(df)
matrix = Matrix(df)

Then convert matrix to to SharedArray and compute. Then back to matrix.

dfprocessed = DataFrame(matrix, mynames)

Note this method may fail if your dataframe's data are not all of uniform type. It would work best if all are integer or all floating point. You might have to first drop non-numeric columns or set those to numeric levels.

  • Related