Home > Blockchain >  Aggregate function with Expr in PySpark 3.0.3
Aggregate function with Expr in PySpark 3.0.3

Time:10-04

The following code works well with PySpark 3.2.1

df.withColumn(
    "total_amount",
    f.aggregate(f.col("taxes"), f.lit(0.00), lambda acc, x: acc   x["amount"]),
)

I've downgraded to PySpark 3.0.3. How to change the above code to something like this:

df.withColumn(
    "total_amount",
     # f.aggregate(f.col("taxes"), f.lit(0.00), lambda acc, x: acc   x["amount"]),
     f.lit(expr("aggregate(taxes,0,(acc,x)->acc x['amount'])"))
)

x['amount'] does not work in our case! Is there something wrong the expression or I must change the taxes to have a list of numbers?

2nd case

df.withColumn(
            "total_amount_2",
            f.aggregate(
                f.filter(
                    "lines",
                    lambda x: (
                        x["id"].isNotNull()
                        & (
                            x["code"].isin(["CODE1", "CODE2"]) == False
                        )
                    ),
                ),
                f.lit(0.00),
                lambda acc, x: acc   x["amount"],
            ),
        )

How to refactor these cases using spark.sql expr function ?

CodePudding user response:

Try the following. I'm certain that one can access struct fields using dot notation too. I'm just not sure about the data type that you use (0.00), as this should be of the same data type as before. I have added the D letter which indicates it's a double.

df.withColumn(
    "total_amount",
    F.expr("aggregate(taxes, 0D, (acc, x) -> acc   x.amount)")
)

Regarding the 2nd case, review the following test case. I have tested it using Spark 3.0.3.

Input df:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([('1', 'CODE3', 3.0), ('2', 'CODE3', 3.0)],)],
    'lines: array<struct<id:string,code:string,amount:double>>'
)

Script:

df = df.withColumn(
    "total_amount_2",
    F.expr("""
        aggregate(
            filter(
                lines,
                x -> x.id is not null and !x.code in ('CODE1', 'CODE2')
            ),
            0D,
            (acc, x) -> acc   x.amount
        )
    """)
)
df.show(truncate=0)
#  ---------------------------------- -------------- 
# |lines                             |total_amount_2|
#  ---------------------------------- -------------- 
# |[[1, CODE3, 3.0], [2, CODE3, 3.0]]|6.0           |
#  ---------------------------------- -------------- 
  • Related