Home > database >  Effective record linkage
Effective record linkage

Time:10-19

I've asked a bit similar question earlier today. Here it is. Shortly: I need to do record linkage for two large datasets (1.6M & 6M). I was going to use Sparks thinking that Cartesian product I was warned about would not be such a big problem. But it is. It hit the performance so hard that the linkage process didn't finish in 7 hours..

Is there another library/framework/tool for doing this more effectively? Or maybe improve performance of the solution below?

The code I ended up with:

    object App {
    
      def left(col: Column, n: Int) = {
        assert(n > 0)
        substring(col, 1, n)
      }
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .master("local[4]")
          .appName("MatchingApp")
          .getOrCreate()
    
        import spark.implicits._
    
        val a = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/a.csv")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
    
        val b = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/b.txt")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
    
        // @formatter:off
        val condition = a
          .col("FULL_NAME").contains(b.col("FIRST_NAME"))
          .and(a.col("FULL_NAME").contains(b.col("LAST_NAME")))
          .and(a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
            .or(a.col("STREET").startsWith(left(b.col("STR"), 3))))
        // @formatter:on
        val startMillis = System.currentTimeMillis();
        val res = a.join(b, condition, "left_outer")
        val count = res
          .filter(col("B_ID").isNotNull)
          .count()
        println(s"Count: $count")
        val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
        println(s"Execution time: ${executionTime.toMinutes}m")
      }
    }

Probably the condition is too complicated, but it must be that way.

CodePudding user response:

You may improve performance of your current solution by changing a bit the logic of how your perform your linkage:

  • First perform an inner join of a and b dataframes with columns that you know matches. In your case, it seems to be LAST_NAME and FIRST_NAME columns.
  • Then filter the resulting dataframe with your specific complex conditions, In your case, birth dates are equal or street matches condition.
  • Finally, if you need to also keep the not linked records, perform a right join with the a dataframe.

Your code could be rewritten as follow:

import org.apache.spark.sql.functions.{col, substring, to_date}
import org.apache.spark.sql.SparkSession

import java.time.Duration

object App {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("MatchingApp")
      .getOrCreate()

    val a = spark.read
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .load("/home/helveticau/workstuff/a.csv")
      .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))

    val b = spark.read
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .load("/home/helveticau/workstuff/b.txt")
      .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))

    val condition = a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
      .or(a.col("STREET").startsWith(substring(b.col("STR"), 1, 3)))

    val startMillis = System.currentTimeMillis();
    val res = a.join(b, Seq("LAST_NAME", "FIRST_NAME"))
      .filter(condition)
      // two following lines optional if you want to only keep records with not null B_ID
      .select("B_ID", "A_ID")
      .join(a, Seq("A_ID"), "right_outer") 

    val count = res
      .filter(col("B_ID").isNotNull)
      .count()
    println(s"Count: $count")
    val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
    println(s"Execution time: ${executionTime.toMinutes}m")
  }
}

So you will avoid cartesian product at the price of two joins instead of only one.

Example

With file a.csv containing the following data:

"A_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STREET"
10;John;Doe;1965-10-21;Johnson Road
11;Rebecca;Davis;1977-02-27;Lincoln Road
12;Samantha;Johns;1954-03-31;Main Street
13;Roger;Penrose;1987-12-25;Oxford Street
14;Robert;Smith;1981-08-26;Canergie Road
15;Britney;Stark;1983-09-27;Alshire Road

And b.txt having the following data:

"B_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STR"
29;John;Doe;21.10.1965;Johnson Road
28;Rebecca;Davis;28.03.1986;Lincoln Road
27;Shirley;Iron;30.01.1956;Oak Street
26;Roger;Penrose;25.12.1987;York Street
25;Robert;Dayton;26.08.1956;Canergie Road
24;Britney;Stark;22.06.1962;Algon Road

res dataframe will be:

 ---- ---- ---------- --------- ---------- ------------- 
|A_ID|B_ID|FIRST_NAME|LAST_NAME|BIRTH_DATE|STREET       |
 ---- ---- ---------- --------- ---------- ------------- 
|10  |29  |John      |Doe      |1965-10-21|Johnson Road |
|11  |28  |Rebecca   |Davis    |1977-02-27|Lincoln Road |
|12  |null|Samantha  |Johns    |1954-03-31|Main Street  |
|13  |26  |Roger     |Penrose  |1987-12-25|Oxford Street|
|14  |null|Robert    |Smith    |1981-08-26|Canergie Road|
|15  |null|Britney   |Stark    |1983-09-27|Alshire Road |
 ---- ---- ---------- --------- ---------- ------------- 

Note: if your FIRST_NAME and LAST_NAME columns are not exactly the same, you can try to make them matches with Spark's built-in functions, for instance:

  • trim to remove spaces at start and end of string
  • lower to transform the column to lower case (and thus ignore case in comparison)

What is really important is to have the maximum number of columns that exactly match.

  • Related