I have a problem with the CountDistinct on pyspark. I have two joined table and I wanted to show the number of distinct key values of the two different tables.
input_table = spark.read.parquet(path table_name1)
input_table2 = spark.read.parquet(path table_name2)
impacted_columns = ctrl \
.where((F.col("tableName1") == table_name1) & (F.col("tableName2") == table_name2)) \
.join(key, on=['tableName1', 'tableName2']) \
.join(table, ctrl["tableName1"] == table['tableName']) \
.groupby("tableName1", 'tableName2', 'key1', 'key2', "keycolumn") \
.agg(F.collect_list(F.struct("columnName1", "columnName2")).alias('imp_columns')) \
.first()
input_table = input_table.select(
[F.col(c.columnName1).alias("T1_" c.columnName1) for c in impacted_columns.imp_columns]
[F.col(c).alias("T1_" c) for c in
list(set().union(impacted_columns.key1.split("-"), impacted_columns.keycolumn.split("-")))]
)
input_table2 = input_table2.select(
[F.col(c.columnName2).alias("T2_" c.columnName2) for c in impacted_columns.imp_columns]
[F.col(c).alias("T2_" c) for c in
list(set().union(impacted_columns.key2.split("-"), impacted_columns.keycolumn.split("-")))]
)
ppp= input_table \
.join(input_table2, [input_table["T1_" f] == input_table2["T2_" s] for (f, s) in
zip(impacted_columns.key1.split("-"), impacted_columns.key2.split("-"))],
"left_anti")
plu= input_table2 \
.join(input_table, [input_table["T1_" f] == input_table2["T2_" s] for (f, s) in
zip(impacted_columns.key1.split("-"), impacted_columns.key2.split("-"))],
"left_anti")
pppc=F.countDistinct(ppp.select(["T1_" c for c in impacted_columns.key1.split("-")]))
pluc=F.countDistinct(plu.select(["T2_" c for c in impacted_columns.key2.split("-")]))
But it doesn't work, does anybody know what is wrong in my code? (it works till the countDistinct)
pppc=F.countDistinct(ppp.select(["T1_" c for c in impacted_columns.key1.split("-")]))
TypeError: Invalid argument, not a string or column: DataFrame[T1_c: string, T1_p: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
CodePudding user response:
Your get an error because you pass a dataframe to F.countDistinct
. Each argument to F.countDistinct
should be a column name.
Try to swap the last two lines with this:
ppc = ppp.agg(F.countDistinct(*["T1_" c for c in impacted_columns.key1.split("-")]))
pluc= plu.agg(F.countDistinct(*["T2_" c for c in impacted_columns.key2.split("-")]))
You can then collect your distinct count at the driver as follows:
print(ppc.collect()[0][0])
print(pluc.collect()[0][0])