Home > Software engineering >  Is there a way to parallelize running R script by input files?
Is there a way to parallelize running R script by input files?

Time:10-21

I'm reading in a bunch of files (over seven thousand) as data frames. All the files are in the same parent folder, with organized and consistent sub-directories. The files are currently organized by timestamp. I want to read the files in, and then export the files to a different folder, where each file is a player ID. There may be multiple timestamps for a single player in the same input data frame. And sometimes a player ID will not be in an input data frame at all. I've figured out the data wrangling (it's quite simple), but since each file is ~1.5 million rows it takes about 5 hours for a single file. So I can't simply loop through all seven thousand files. I want to parallelize by input file instead (though parallelizing by output file might be better?). I'll run this on an HPC with enough CPUs and I don't need to specify my CPU requirement before I use the HPC. I know the doParallel package exists, but the tutorial introduction vignette("gettingstartedParallel") didn't work and I didn't understand the other doParallel posts. (Please don't just refer me to the doParallel package without the relevant code.) I'm also concerned about the code crashing as it tries to write to the same csv multiple times. CSVs can't be written to in parallel, even if I set append = TRUE. Here's the code with how I read in the files and how I wrangle and write out the files to a new folder.

# Example input data frames (in the real code I create a vector of Alltimes using list.files() )
times1 <- data.frame(
  ID = c('PL1', 'PL2', 'PL3', 'PL2','PL1'),
  times = c(42.6, 41.5, 42.9, 47.0, 44.3),
  speed = c(64, 66, 43, 39, 55) 
)

times2 <- data.frame(
  ID = c('PL3', 'PL3', 'PL3', 'PL1','PL1'),
  times   = c(62.1, 51.7, 65.9, 62.1, 55.3),
  speed = c(71, 73, 45, 64, 66) 
)

# Create vector of all parquets filepaths
Alltimes <- list.files(path = 'Input_Folder_Path)',
                       pattern = '*.snappy.parquet$',
                       recursive = TRUE,
                       full.names = TRUE)

# Iterate through timestamp input files (I want this part parallelized instead of a loop)
# for( i in 1:length(Alltimes)){
  
  # Read in the individual file
  # when the Alltimes vector is the file path I use read_parquet( Alltimes[i] ), but
  # times1 is a substitute for this example.
  df = times1 
  # df = times2
  # df = read_parquet( Alltimes[i] )
  
  # get vector of all player ids in this data frame
  all_ids_vec <- unique(x = df$ID)
  
  # write out individual csv for each player ID
  for(j in 1:length(all_ids_vec)){
    
    # Subset the df by that specific player ID
    one_player <- df %>% filter(ID == all_ids_vec[j])
    
    write.table(x = one_player, 
                file = "C:/Users/Juliet/Desktop/", all_ids_vec[j],".csv", 
                append = TRUE, 
                quote = FALSE, 
                sep = ",", 
                row.names = FALSE, 
                col.names = FALSE)
  }
# }

CodePudding user response:

To write the files to disk in parallel, the following code works on Windows. Note that package doParallel is not meant for Windows.

library(parallel)

dirname <- "C:/Users/611913/Desktop"
#dirname <- path.expand("~/tmp/so")

sp <- split(df, df$ID)
id_vec <- names(sp)

f <- function(X, filename, path){
  filename <- file.path(path, filename)
  write.table(x = X, 
              file = filename, 
              append = TRUE, 
              quote = FALSE, 
              sep = ",", 
              row.names = FALSE, 
              col.names = FALSE)  
}

# Windows
ncores <- detectCores()

cl <- makeCluster(ncores - 1L)
clusterExport(cl, "sp")
clusterExport(cl, "id_vec")
clusterExport(cl, "dirname")
clusterExport(cl, "f")

clusterEvalQ(cl, "sp")
clusterEvalQ(cl, "id_vec")
clusterEvalQ(cl, "dirname")
clusterEvalQ(cl, "f")

res <- parLapply(cl, seq_along(sp), function(j){
  f(sp[[j]], id_vec[j], dirname)
})

stopCluster(cl)

Test data

This code creates the test data.

n <- 1000L
ID <- sprintf("idd", seq_len(n))
ID <- rep(ID, 3)
df <- data.frame(ID, x = rnorm(3*n), y = sample(10, 3*n, TRUE))
  • Related