I want to join three different tables using an array of aliases as the join condition:
table_1 = spark.createDataFrame([("T1", ['a','b','c']), ("T2", ['d','e','f'])], ["id", "aliases"])
table_2 =spark.createDataFrame([("P1", ['a','h','e']), ("P2", ['j','k','l'])], ["id", "aliases"])
Table 3:
table_3= spark.createDataFrame([("G1", ['a','n','o']), ("G2", ['p','q','l']), ("G3", ['c','z'])], ["id", "aliases"])
And I want to get a table like this:
Aliases | table1_ids | table2_id | table3_id |
---|---|---|---|
[n, b, h, o, a, e, d, c, f, z] | [T1, T2] | [P1] | [G1,G3] |
[k, q, j, p, l] | [] | [P2] | [G2] |
Where all related aliases are in the same row and there is no repeated ID of the three initial tables. In other words, I am trying to group by a common alias and to collect all different IDs in which these aliases can be found.
I have used Spark SQL for the code examples, but feel free of using Pyspark or Pandas.
Thanks in advance.
CodePudding user response:
Well, I have thought about it and I think that what I described it's a Graph problem. More precisely, I was trying to find all
And as we want to have each ID in a different column, we have to extract them from the 'aliases' column and to create three new ones. For doing so, we will use regex and a UDF:
import re
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.types import StructType,StructField, StringType, ArrayType, DoubleType
df_schema = StructType([
StructField("table_1_ids",ArrayType(StringType()),True),
StructField("table_2_ids",ArrayType(StringType()),True),
StructField("table_3_ids",ArrayType(StringType()),True),
StructField("aliases",ArrayType(StringType()),True)
])
@udf(returnType=df_schema)
def get_correct_results(aliases):
regex_table_1 = "(T\d)"
regex_table_2 = "(P\d)"
regex_table_3 = "(G\d)"
table_1_ids = []
table_2_ids = []
table_3_ids = []
elems_to_remove = []
for elem in aliases:
result_table_1 = re.search(regex_table_1, elem)
result_table_2 = re.search(regex_table_2, elem)
result_table_3 = re.search(regex_table_3, elem)
if result_table_1:
elems_to_remove.append(elem)
table_1_ids.append(result_table_1.group(1))
elif result_table_2:
elems_to_remove.append(elem)
table_2_ids.append(result_table_2.group(1))
elif result_table_3:
elems_to_remove.append(elem)
table_3_ids.append(result_table_3.group(1))
return {'table_1_ids':list(set(table_1_ids)), 'table_2_ids':list(set(table_2_ids)),'table_3_ids':list(set(table_3_ids)), 'aliases':list(set(aliases) - set(elems_to_remove))}
So, finally, we use the previous UDF in the 'final_df':
master_df = components_grouped_df.withColumn("return",get_correct_results(f.col("aliases")))\
.selectExpr("component as row_id","return.aliases as aliases","return.table_1_ids as table_1_ids","return.table_2_ids as table_2_ids", "return.table_3_ids as table_3_ids")