Home > Back-end >  Spark record linkage in Java
Spark record linkage in Java

Time:10-11

I need to do record linkage of two datasets based on equivalence or similarity of certain fields. For example, let's say the datasets look like this (with some random data):

A:

A_ID FirstName LastName BirthDate Address
0 Vera Williams 12.03.1999 Colorado, Greeley, 3774 Stark Hollow Road
1 Joseph Peters 11.10.1988 Florida, Deltona, 4577 Willis Avenue

B:

B_ID FullName BirthDate Street
37 Joseph Peters 11.10.1988 4577 Willis Avenue
49 Valerie J Porter 17.01.2000 2114 Center Street

I want a link to be established (IDs exchange) when a record from the B set 'matches' a record from the A set.

Let's say:

  • B record must contain both FirstName & LastName from A in its FullName
  • B record's BirthDate must match A's BirthDate
  • A's Address must contain B's Street in it

Joseph Peters from both sets meets all these rules.

How would I do it with Spark? A small example would be really appreciated, because there is not a lot of information on the internet and it's either in Scala or Python.

UPD: If someone can show in Scala - OK, I can understand Scala too and probably translate then to Java.

CodePudding user response:

I did't test this code with your data, but hope it works. On Kotlin:

val datasetA: Dataset<Row> = ...
val datasetB: Dataset<Row> = ...

val condition = datasetA.col("BirthDate").equalTo(datasetB.col("BirthDate"))
        .and(datasetB.col("FullName").contains(datasetA.col("FirstName")))
        .and(datasetB.col("FullName").contains(datasetA.col("LastName")))
        .and(datasetB.col("Address").contains(datasetA.col("Street")))
val result = datasetA.join(featuresDF, condition)

CodePudding user response:

You can join your two dataframes.

The most efficient way will be to create some columns in dataframe A in order to use only column equality conditions as join condition, that will prevent Spark to fallback to very inefficient cartesian product when joining the two dataframes. You can do as follow:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Arrays;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.element_at;
import static org.apache.spark.sql.functions.split;

import scala.collection.JavaConverters;

...

Dataset<Row> outputDataframe = dataframeA
  .withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName")))
  .withColumn("Street", element_at(split(col("Address"), ", "), -1))
  .join(dataframeB, JavaConverters.asScalaBuffer(Arrays.asList("Street", "FullName", "BirthDate")), "left_outer")
  .drop("Street", "FullName");

With your example dataframe A:

 ---- --------- -------- ---------- ----------------------------------------- 
|A_ID|FirstName|lastName|BirthDate |Address                                  |
 ---- --------- -------- ---------- ----------------------------------------- 
|0   |Vera     |Williams|12.03.1999|Colorado, Greeley, 3774 Stark Hollow Road|
|1   |Joseph   |Peters  |11.10.1988|Florida, Deltona, 4577 Willis Avenue     |
 ---- --------- -------- ---------- ----------------------------------------- 

And dataframe B:

 ---- ---------------- ---------- ------------------ 
|B_ID|FullName        |BirthDate |Street            |
 ---- ---------------- ---------- ------------------ 
|37  |Joseph Peters   |11.10.1988|4577 Willis Avenue|
|49  |Valerie J Porter|17.01.2000|2114 Center Street|
 ---- ---------------- ---------- ------------------ 

You will get the following output dataframe:

 ---------- ---- --------- -------- ----------------------------------------- ---- 
|BirthDate |A_ID|FirstName|lastName|Address                                  |B_ID|
 ---------- ---- --------- -------- ----------------------------------------- ---- 
|12.03.1999|0   |Vera     |Williams|Colorado, Greeley, 3774 Stark Hollow Road|null|
|11.10.1988|1   |Joseph   |Peters  |Florida, Deltona, 4577 Willis Avenue     |37  |
 ---------- ---- --------- -------- ----------------------------------------- ---- 

Note: If you can't easily extract exact matching data from dataframe A, you can go with Egor's Solution. However, you may hit performance issues as Spark will perform a cartesian product.

  • Related