Home > Back-end >  Join and combine dataframe by array intersection
Join and combine dataframe by array intersection

Time:12-02

I want to join three different tables using an array of aliases as the join condition:

Table 1: table_1

table_1 = spark.createDataFrame([("T1", ['a','b','c']), ("T2", ['d','e','f'])], ["id", "aliases"])

Table 2: table_2

table_2 =spark.createDataFrame([("P1", ['a','h','e']), ("P2", ['j','k','l'])], ["id", "aliases"])

Table 3:

table_3

table_3= spark.createDataFrame([("G1", ['a','n','o']), ("G2", ['p','q','l']), ("G3", ['c','z'])], ["id", "aliases"])

And I want to get a table like this:

Aliases table1_ids table2_id table3_id
[n, b, h, o, a, e, d, c, f, z] [T1, T2] [P1] [G1,G3]
[k, q, j, p, l] [] [P2] [G2]

Where all related aliases are in the same row and there is no repeated ID of the three initial tables. In other words, I am trying to group by a common alias and to collect all different IDs in which these aliases can be found.

I have used Spark SQL for the code examples, but feel free of using Pyspark or Pandas.

Thanks in advance.

CodePudding user response:

Well, I have thought about it and I think that what I described it's a Graph problem. More precisely, I was trying to find all components_grouped_df

And as we want to have each ID in a different column, we have to extract them from the 'aliases' column and to create three new ones. For doing so, we will use regex and a UDF:

import re
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.types import StructType,StructField, StringType, ArrayType, DoubleType

df_schema = StructType([ 
    StructField("table_1_ids",ArrayType(StringType()),True),    
    StructField("table_2_ids",ArrayType(StringType()),True),
    StructField("table_3_ids",ArrayType(StringType()),True),
    StructField("aliases",ArrayType(StringType()),True)
  ])

@udf(returnType=df_schema)
def get_correct_results(aliases):
    regex_table_1 = "(T\d)"
    regex_table_2 = "(P\d)"
    regex_table_3 = "(G\d)"
    table_1_ids = []
    table_2_ids = []
    table_3_ids = []
    elems_to_remove = []
    for elem in aliases:
        result_table_1 = re.search(regex_table_1, elem)
        result_table_2 = re.search(regex_table_2, elem)
        result_table_3 = re.search(regex_table_3, elem)
        if result_table_1:
            elems_to_remove.append(elem)
            table_1_ids.append(result_table_1.group(1))
        elif result_table_2:
            elems_to_remove.append(elem)
            table_2_ids.append(result_table_2.group(1))
        elif result_table_3:
            elems_to_remove.append(elem)
            table_3_ids.append(result_table_3.group(1))
    return {'table_1_ids':list(set(table_1_ids)), 'table_2_ids':list(set(table_2_ids)),'table_3_ids':list(set(table_3_ids)), 'aliases':list(set(aliases) - set(elems_to_remove))}

So, finally, we use the previous UDF in the 'final_df':

master_df = components_grouped_df.withColumn("return",get_correct_results(f.col("aliases")))\
.selectExpr("component as row_id","return.aliases as aliases","return.table_1_ids as table_1_ids","return.table_2_ids as table_2_ids", "return.table_3_ids as table_3_ids")

And the final DF will look like this: master_df

  • Related