I have this dataset over here (e.g. students wrote an exam many times over a period of years and either pass or failed - I am interested in studying the effect of the previous test on the next test):
id = sample.int(10000, 100000, replace = TRUE)
res = c(1,0)
results = sample(res, 100000, replace = TRUE)
date_exam_taken = sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data = data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]
my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL
id results date_exam_taken exam_number
7992 1 1 2004-04-23 1
24837 1 0 2004-12-10 2
12331 1 1 2007-01-19 3
34396 1 0 2007-02-21 4
85250 1 0 2007-09-26 5
11254 1 1 2009-12-20 6
I wrote this standard FOR LOOP and everything seems to work fine:
my_list = list()
for (i in 1:length(unique(my_data$id)))
{
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_a = do.call(rbind.data.frame, my_list)
Now, I am trying to "optimize" this loop by using "doParallel" libraries in R.
Using this post (How to transform a "for loop" in a "foreach" loop in R?) as a tutorial, I tried to convert my loop as follows:
# does this mean I should set makeCluster() to makeCluster(8)???
> detectCores()
[1] 8
my_list = list()
max = length(unique(my_data$id))
library(doParallel)
registerDoParallel(cl <- makeCluster(3))
# note: for some reason, this loop isn't printing?
test = foreach(i = 1:max, .combine = "rbind") %dopar% {
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_b = do.call(rbind.data.frame, test)
Based on this - I have the following questions:
Have I correctly used the "doParallel" functionalities as they are intended to be used?
Is there yet a better way to do this?
Note: I am looking to run this code on a dataset with around 10 million unique ID's
CodePudding user response:
Here is a way with the parallel code written as a function.
I split the data by id beforehand, instead of comparing each id with the current index i
. This saves some time. It also saves time to extract the results
vector only once.
I don't know why, I haven't found any errors in my parallel code but the final data.frame is not equal to the sequential output final_a
, it has more rows.
This is system dependent but as you can see in the timings, the 6 cores run is the fastest.
library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators
parFun <- function(my_data, ncores) {
split_data <- split(my_data, my_data$id)
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
test <- foreach(i = seq_along(split_data)) %dopar% {
start_i_results <- split_data[[i]]$results
n <- length(start_i_results)
if(n > 1L) {
tryCatch({
pairs_i <- data.frame(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(table(pairs_i))
frame_i$id <- i
frame_i
}, error = function(e) {e})
} else NULL
}
final_b <- do.call(rbind.data.frame, test)
final_b
}
set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]
my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL
t0 <- system.time({
my_list = list()
for (i in 1:length(unique(my_data$id)))
{
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
# print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_a = do.call(rbind.data.frame, my_list)
})
ncores <- detectCores()
# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#> user.self sys.self elapsed
#> t0 12.86 1.00 15.37
#> t3 3.50 0.22 8.37
#> t6 3.61 0.46 7.65
head(final_a, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
head(res6, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
str(final_a)
#> 'data.frame': 38945 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame': 38949 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
Created on 2022-12-11 with reprex v2.0.2
Edit
The following version seems much faster.
parFun2 <- function(my_data, ncores) {
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
results_list <- split(my_data$results, my_data$id)
test <- foreach(i = seq_along(results_list)) %dopar% {
start_i_results <- results_list[[i]]
n <- length(start_i_results)
if(n > 1L) {
tbl <- table(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(tbl)
frame_i$id <- i
frame_i
} else NULL
}
data.table::rbindlist(test)
}