I need to do record linkage of two datasets based on equivalence or similarity of certain fields. For example, let's say the datasets look like this (with some random data):
A:
A_ID | FirstName | LastName | BirthDate | Address |
---|---|---|---|---|
0 | Vera | Williams | 12.03.1999 | Colorado, Greeley, 3774 Stark Hollow Road |
1 | Joseph | Peters | 11.10.1988 | Florida, Deltona, 4577 Willis Avenue |
B:
B_ID | FullName | BirthDate | Street |
---|---|---|---|
37 | Joseph Peters | 11.10.1988 | 4577 Willis Avenue |
49 | Valerie J Porter | 17.01.2000 | 2114 Center Street |
I want a link to be established (IDs exchange) when a record from the B set 'matches' a record from the A set.
Let's say:
- B record must contain both FirstName & LastName from A in its FullName
- B record's BirthDate must match A's BirthDate
- A's Address must contain B's Street in it
Joseph Peters from both sets meets all these rules.
How would I do it with Spark? A small example would be really appreciated, because there is not a lot of information on the internet and it's either in Scala or Python.
UPD: If someone can show in Scala - OK, I can understand Scala too and probably translate then to Java.
CodePudding user response:
I did't test this code with your data, but hope it works. On Kotlin:
val datasetA: Dataset<Row> = ...
val datasetB: Dataset<Row> = ...
val condition = datasetA.col("BirthDate").equalTo(datasetB.col("BirthDate"))
.and(datasetB.col("FullName").contains(datasetA.col("FirstName")))
.and(datasetB.col("FullName").contains(datasetA.col("LastName")))
.and(datasetB.col("Address").contains(datasetA.col("Street")))
val result = datasetA.join(featuresDF, condition)
CodePudding user response:
You can join your two dataframes.
The most efficient way will be to create some columns in dataframe A
in order to use only column equality conditions as join condition, that will prevent Spark to fallback to very inefficient cartesian product when joining the two dataframes. You can do as follow:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Arrays;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.element_at;
import static org.apache.spark.sql.functions.split;
import scala.collection.JavaConverters;
...
Dataset<Row> outputDataframe = dataframeA
.withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName")))
.withColumn("Street", element_at(split(col("Address"), ", "), -1))
.join(dataframeB, JavaConverters.asScalaBuffer(Arrays.asList("Street", "FullName", "BirthDate")), "left_outer")
.drop("Street", "FullName");
With your example dataframe A
:
---- --------- -------- ---------- -----------------------------------------
|A_ID|FirstName|lastName|BirthDate |Address |
---- --------- -------- ---------- -----------------------------------------
|0 |Vera |Williams|12.03.1999|Colorado, Greeley, 3774 Stark Hollow Road|
|1 |Joseph |Peters |11.10.1988|Florida, Deltona, 4577 Willis Avenue |
---- --------- -------- ---------- -----------------------------------------
And dataframe B
:
---- ---------------- ---------- ------------------
|B_ID|FullName |BirthDate |Street |
---- ---------------- ---------- ------------------
|37 |Joseph Peters |11.10.1988|4577 Willis Avenue|
|49 |Valerie J Porter|17.01.2000|2114 Center Street|
---- ---------------- ---------- ------------------
You will get the following output
dataframe:
---------- ---- --------- -------- ----------------------------------------- ----
|BirthDate |A_ID|FirstName|lastName|Address |B_ID|
---------- ---- --------- -------- ----------------------------------------- ----
|12.03.1999|0 |Vera |Williams|Colorado, Greeley, 3774 Stark Hollow Road|null|
|11.10.1988|1 |Joseph |Peters |Florida, Deltona, 4577 Willis Avenue |37 |
---------- ---- --------- -------- ----------------------------------------- ----
Note: If you can't easily extract exact matching data from dataframe
A
, you can go with Egor's Solution. However, you may hit performance issues as Spark will perform a cartesian product.