Home > Blockchain >  CountDistinct in a dataframe from two different table Pyspark
CountDistinct in a dataframe from two different table Pyspark

Time:10-08

I have a problem with the CountDistinct on pyspark. I have two joined table and I wanted to show the number of distinct key values of the two different tables.

input_table = spark.read.parquet(path   table_name1)
input_table2 = spark.read.parquet(path   table_name2)

impacted_columns = ctrl \
    .where((F.col("tableName1") == table_name1) & (F.col("tableName2") == table_name2)) \
    .join(key, on=['tableName1', 'tableName2']) \
    .join(table, ctrl["tableName1"] == table['tableName']) \
    .groupby("tableName1", 'tableName2', 'key1', 'key2', "keycolumn") \
    .agg(F.collect_list(F.struct("columnName1", "columnName2")).alias('imp_columns')) \
    .first()

input_table = input_table.select(
    [F.col(c.columnName1).alias("T1_"   c.columnName1) for c in impacted_columns.imp_columns]  
    [F.col(c).alias("T1_"   c) for c in
     list(set().union(impacted_columns.key1.split("-"), impacted_columns.keycolumn.split("-")))]
)

input_table2 = input_table2.select(
    [F.col(c.columnName2).alias("T2_"   c.columnName2) for c in impacted_columns.imp_columns]  
    [F.col(c).alias("T2_"   c) for c in
     list(set().union(impacted_columns.key2.split("-"), impacted_columns.keycolumn.split("-")))]
)

ppp= input_table \
        .join(input_table2, [input_table["T1_"   f] == input_table2["T2_"   s] for (f, s) in
                             zip(impacted_columns.key1.split("-"), impacted_columns.key2.split("-"))],
              "left_anti")
plu= input_table2 \
        .join(input_table, [input_table["T1_"   f] == input_table2["T2_"   s] for (f, s) in
                             zip(impacted_columns.key1.split("-"), impacted_columns.key2.split("-"))],
              "left_anti")
pppc=F.countDistinct(ppp.select(["T1_" c for c in impacted_columns.key1.split("-")]))
pluc=F.countDistinct(plu.select(["T2_"   c for c in impacted_columns.key2.split("-")]))

But it doesn't work, does anybody know what is wrong in my code? (it works till the countDistinct)

pppc=F.countDistinct(ppp.select(["T1_" c for c in impacted_columns.key1.split("-")]))
TypeError: Invalid argument, not a string or column: DataFrame[T1_c: string, T1_p: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

CodePudding user response:

Your get an error because you pass a dataframe to F.countDistinct. Each argument to F.countDistinct should be a column name.

Try to swap the last two lines with this:

ppc = ppp.agg(F.countDistinct(*["T1_" c for c in impacted_columns.key1.split("-")]))
pluc= plu.agg(F.countDistinct(*["T2_"   c for c in impacted_columns.key2.split("-")]))

You can then collect your distinct count at the driver as follows:

print(ppc.collect()[0][0])
print(pluc.collect()[0][0])
  • Related