Home > Back-end >  Optimal way of multiprocessing a rowwise matching operation between two data frames
Optimal way of multiprocessing a rowwise matching operation between two data frames

Time:06-04

I'm working on an entity resolution task with large databases (df1 ~0.5 mil. rows, df2 up to 18 mil. rows).

In df1 I have first and last names, with first names being in regex form to allow for multiple variations of the same name - I didn't bother including it in the attached example, but the string values look something like: ^(^robert$|^rob$|^robb$)$).

In df2 I have regular first and last names.

My approach is to go through df1 row by row, note the last name and first name regex, then filter df2 first for an exact last name match, then for the first name regex match.

This is simulated in the below code.

library(dplyr)
library(data.table)

set.seed(1)

df1 <- data.table(id1=sprintf("A%s",1:10000),
                  fnreg1=stringi::stri_rand_strings(n=10000,length=2,pattern="[a-z]"),
                  lname1=stringi::stri_rand_strings(n=10000,length=2,pattern="[a-z]")) %>%
  dplyr::mutate(fnreg1 = paste0("^(",fnreg1,")$"))

df2 <- data.table(id2=sprintf("B%s",1:100000),
                  fname2=stringi::stri_rand_strings(n=100000,length=2,pattern="[a-z]"),
                  lname2=stringi::stri_rand_strings(n=100000,length=2,pattern="[a-z]"))


process_row <- function(i){
  
  rw <- df1[i,]
  
  fnreg <- rw$fnreg1
  ln <- rw$lname1
  
  ln.match <- df2[lname2==ln, ]
  out.match <- ln.match[grepl(fnreg, fname2), ]
  
  return(cbind(rw,out.match))
  
}

## 16 seconds
tictoc::tic()
out <- lapply(1:nrow(df1), process_row) %>% do.call(rbind,.) %>% na.omit()
tictoc::toc()

The lapply format I want to keep for parallelizing. I use the following code, note I'm on Windows so I need to prepare the clusters to get it working:

library(parallel)
prep_cluster <- function(export_vars){
  
  cl <- makeCluster(detectCores()-1)
  clusterEvalQ(cl, library(dplyr))
  clusterEvalQ(cl, library(data.table))
  clusterExport(cl, export_vars)
  return(cl)
  
}

cl <- prep_cluster(list("df1","df2","process_row"))

## 2 seconds
tictoc::tic()
out.p <- parLapply(cl, 1:nrow(df1), process_row) %>% do.call(rbind,.) %>% na.omit()
tictoc::toc()

stopCluster(cl)

For my large datasets, my code works pretty slowly. I'm almost certain that the way I defined process_row is very poorly optimized. But I'm not sure how to change the function to be faster and still conform to the parLapply format.

Any tips appreciated.

EDIT: I'm pretty short on memory, working with only 32GB - so I need to optimize it that way too.

For the largest data files (18 mil rows) I am splitting them into chunks and matching each chunk separately.

CodePudding user response:

My apologies if this strays from your row-by-row processing approach too much, but have you tried simply joining on last name (allowing cartesian), and then just doing the regex match by fnreg1?

df1[df2, on=.(lname1=lname2), allow.cartesian=T][, .SD[grepl(.BY,fname2)], fnreg1]

Gives the same output as out much more quickly (on my machine about 15 times faster)

      fnreg1   id1 lname1    id2 fname2
   1: ^(zz)$  A922     oh B99195     zz
   2: ^(gc)$ A9092     tw  B8522     gc
   3: ^(gc)$ A9092     tw B31522     gc
   4: ^(qr)$ A3146     eo B57772     qr
   5: ^(qr)$ A8466     fo B62764     qr
  ---                                  
2119: ^(da)$ A8238     nl  B2678     da
2120: ^(da)$ A3858     bd B14722     da
2121: ^(da)$ A9325     cr B86598     da
2122: ^(da)$ A9325     cr B98444     da
2123: ^(mf)$ A1109     aq B43220     mf

If the allow.cartesian approach is too much here, we could potentially parallelize on unique first name regex, or on the unique lastnames

library(foreach)
library(doParallel)
registerDoParallel()
  1. on regex:
foreach(fnreg= unique(df1$fnreg1), .packages = c("data.table"),.combine="rbind") %dopar% {
  df1[fnreg1==fnreg][df2[grepl(fnreg,fname2)], on=.(lname1=lname2), nomatch=0]
}
  1. on lastname
foreach(ln= unique(df1$lname1), .packages = c("data.table"),.combine="rbind") %dopar% {
  df1[lname1==ln][df2[lname2==ln], on=.(lname1=lname2), allow.cartesian=T, nomatch=0][, .SD[grepl(.BY,fname2)], fnreg1]
}

Both provide the same output

CodePudding user response:

Parallelizing it is a little problematic: in order to do a true match, each process needs all rows, otherwise your join will invariably be incomplete. With large data, you're going to run into problems with passing the data back and forth. This type of join is what the fuzzyjoin package was written to solve:

fuzzyjoin::fuzzy_inner_join(
  df1, df2, by = c("lname1"="lname2", "fnreg1"="fname2"),
  match_fun = list(`==`, Vectorize(grepl)))

This produces effectively the same output but takes 2-3x as long, most likely because it is more general than your function.

Here's a suggestion, though, that allows parallelization of it in a safer manner: pre-split on the last name, parallelize for each last name (or batch of last names), and then join them in the end. Effectively:

df1spl <- split(df1, df1$lname1)
df2spl <- split(df2, df2$lname2)
allnms <- sort(unique(c(names(df1spl), names(df2spl))))
head(allnms)
# [1] "aa" "ab" "ac" "ad" "ae" "af"

At this point, each of the *spl is a named list with frames, where each frame has a homogenous lname* column (intentional). I use allnms here to ensure that the names all match and in the same order, so for instance names(df1spl) may not be the same as names(df2spl), but names(df1spl[allnms]) will have the same length and order of names as names(df2spl[allnms]). From here, I'll demo with Map but you should be able to employ the parallel version with clusterMap:

system.time(
  out3 <- Map(function(a, b) fuzzyjoin::regex_inner_join(a, b, by = c(fnreg1="fname2")),
              df1spl[allnms], df2spl[allnms])
)
# df1spl[[1]]
#    user  system elapsed 
#   30.64    1.27   32.04 

And the results should be the same:

out3 <- rbindlist(out3)
out3
#          id1 fnreg1 lname1    id2 fname2 lname2
#       <char> <char> <char> <char> <char> <char>
#    1:  A4196 ^(gb)$     aa B52781     gb     aa
#    2:  A7253 ^(sg)$     aa B91012     sg     aa
#    3:  A4675 ^(pe)$     ab B22248     pe     ab
#    4:  A7179 ^(is)$     ac B33418     is     ac
#    5:  A7158 ^(fn)$     ae B77991     fn     ae
#    6:  A6220 ^(kd)$     af B66989     kd     af
#    7:  A5950 ^(wv)$     ag B58928     wv     ag
#    8:  A6502 ^(jm)$     ag  B2949     jm     ag
#    9:   A515 ^(is)$     ai B36747     is     ai
#   10:  A4129 ^(np)$     ai B34729     np     ai
#   ---                                          
# 2114:  A8396 ^(pm)$     zv B26980     pm     zv
# 2115:  A1039 ^(ym)$     zw B60065     ym     zw
# 2116:  A6119 ^(hl)$     zw B71474     hl     zw
# 2117:  A9173 ^(ke)$     zw  B9806     ke     zw
# 2118:  A9847 ^(zn)$     zw  B9835     zn     zw
# 2119:  A5850 ^(nd)$     zx B92629     nd     zx
# 2120:  A5736 ^(ty)$     zy B89244     ty     zy
# 2121:  A7197 ^(yx)$     zz   B657     yx     zz
# 2122:  A9115 ^(fv)$     zz B83779     fv     zz
# 2123:  A9121 ^(ss)$     zz B23468     ss     zz

identical(out[order(id1,lname1,fname2),], out3[order(id1,lname1,fname2),])
# [1] TRUE

Having gone through all of that, it is feasible that you can take your bespoke function and use that instead of fuzzyjoin, with no more need to pre-match on lname*. Since your function is faster here than fuzzyjoin, you may benefit a bit more.

I should note that the use of split(.) will, by definition, duplicate your data in memory. If you are short on RAM, then you might need to be careful in how you do this.

  • Related