Home > Software engineering >  Correctly converting for-loops to parallel loops
Correctly converting for-loops to parallel loops

Time:12-15

I have this dataset over here (e.g. students wrote an exam many times over a period of years and either pass or failed - I am interested in studying the effect of the previous test on the next test):

   id = sample.int(10000, 100000, replace = TRUE)
res = c(1,0)
results = sample(res, 100000, replace = TRUE)
date_exam_taken = sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)


my_data = data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

     id results date_exam_taken exam_number
7992   1       1      2004-04-23           1
24837  1       0      2004-12-10           2
12331  1       1      2007-01-19           3
34396  1       0      2007-02-21           4
85250  1       0      2007-09-26           5
11254  1       1      2009-12-20           6

I wrote this standard FOR LOOP and everything seems to work fine:

my_list = list()

for (i in 1:length(unique(my_data$id)))
    
{ 
    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_a = do.call(rbind.data.frame, my_list)

Now, I am trying to "optimize" this loop by using "doParallel" libraries in R.

Using this post (How to transform a "for loop" in a "foreach" loop in R?) as a tutorial, I tried to convert my loop as follows:

# does this mean I should set makeCluster() to makeCluster(8)???
 > detectCores()
[1] 8

my_list = list()
max = length(unique(my_data$id))

library(doParallel)
registerDoParallel(cl <- makeCluster(3))

# note: for some reason, this loop isn't printing?

test = foreach(i = 1:max, .combine = "rbind") %dopar% {

    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_b = do.call(rbind.data.frame, test)

Based on this - I have the following questions:

  • Have I correctly used the "doParallel" functionalities as they are intended to be used?

  • Is there yet a better way to do this?

  • Note: I am looking to run this code on a dataset with around 10 million unique ID's

CodePudding user response:

Here is a way with the parallel code written as a function.
I split the data by id beforehand, instead of comparing each id with the current index i. This saves some time. It also saves time to extract the results vector only once.

I don't know why, I haven't found any errors in my parallel code but the final data.frame is not equal to the sequential output final_a, it has more rows.

This is system dependent but as you can see in the timings, the 6 cores run is the fastest.

library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators

parFun <- function(my_data, ncores) {
  split_data <- split(my_data, my_data$id)
  
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  test <- foreach(i = seq_along(split_data)) %dopar% {
    start_i_results <- split_data[[i]]$results
    n <- length(start_i_results)
    if(n > 1L) {
      tryCatch({
        pairs_i <- data.frame(first = start_i_results[-n], 
                              second = start_i_results[-1L])
        frame_i <- as.data.frame(table(pairs_i))
        frame_i$id <- i
        frame_i
      }, error = function(e) {e})
    } else NULL
  }
  final_b <- do.call(rbind.data.frame, test)
  final_b
}

set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)

my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

t0 <- system.time({
  my_list = list()
  
  for (i in 1:length(unique(my_data$id)))
    
  { 
    {tryCatch({
      
      start_i = my_data[my_data$id == i,]
      
      pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
      frame_i =  as.data.frame(table(pairs_i))
      frame_i$id = i
      # print(frame_i)
      my_list[[i]] = frame_i
    }, error = function(e){})
    }}
  final_a = do.call(rbind.data.frame, my_list)
})

ncores <- detectCores()

# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#>    user.self sys.self elapsed
#> t0     12.86     1.00   15.37
#> t3      3.50     0.22    8.37
#> t6      3.61     0.46    7.65

head(final_a, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3
head(res6, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3

str(final_a)
#> 'data.frame':    38945 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame':    38949 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...

Created on 2022-12-11 with reprex v2.0.2


Edit

The following version seems much faster.

parFun2 <- function(my_data, ncores) {
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  results_list <- split(my_data$results, my_data$id)
  
  test <- foreach(i = seq_along(results_list)) %dopar% {
    start_i_results <- results_list[[i]]
    n <- length(start_i_results)
    if(n > 1L) {
      tbl <- table(first = start_i_results[-n], 
                   second = start_i_results[-1L])
      frame_i <- as.data.frame(tbl)
      frame_i$id <- i
      frame_i
    } else NULL
  }
  data.table::rbindlist(test)
}
  • Related