Home > Net >  Convert SQL query to PySpark dataframe for case when on joined dataframes
Convert SQL query to PySpark dataframe for case when on joined dataframes

Time:11-17

SQL query:

SELECT dt,
       id,
       a.val1,
       CASE
           WHEN b.val1 = FALSE THEN TRUE
           ELSE FALSE
       END AS inf,
       CASE
           WHEN b.val1 = FALSE THEN coalesce(b.val2, a.val2)
           ELSE a.val2
       END mob,
       CASE
           WHEN b.val1 = FALSE THEN coalesce(b.val3, a.val3)
           ELSE a.val3
       END cli,
       a.val3 fam,
       count(*) unique_val
FROM c
JOIN a ON (c.e_id = a.e_id)
LEFT OUTER JOIN b ON (c.m_id = b.m_id)
GROUP BY 1,
         2,
         3,
         4,
         5,
         6,
         7

I am trying to convert this query to python spark version. I tried different ways but it's not working with joining on multiple dataframes.

My version of PySpark code which is ending up in errors:

joined = c.join(a, c.e_id == a.e_id).join(b, c.m_id == b.m_id, "left_outer")

df = (
    joined.select(["dt", "id", "a.val1"])
    .withColumn("inf", when(F.col("b.val1") == False, True).otherwise(False))
    .withColumn(
        "mob",
        when(F.col("b.val1") == False, coalesce(b.val2, a.val2)).otherwise(a.val2),
    )
    .withColumn(
        "cli",
        when(F.col("b.val1") == False, coalesce(b.val3, a.val3)).otherwise(a.val3),
    )
    .withColumnRenamed("a.val3", "fam")
    .groupby(["dt", "id", "a.val1", "inf", "mob", "cli", "fam"])
    .count()
    .withColumnRenamed("count", "unique_val")
)

If anyone can help , will be so much appreciated. Thank you.

CodePudding user response:

  1. replace df.column_name with df["column_name"] as string within bracket is not case sensitive.
  2. do you select as one statement, without the withColumn as in your sql query.
  3. b["val1"] == False should be replaced with ~b["val1"] (sql not b.val1)
  4. if you do not explain what are the errors (as we cannot reproduce your current code without any related data), it is almost impossible to understand what could go wrong.
df = (
    joined.groupBy(
        "dt",
        "id",
        a["val1"],
        when(~b["val1"], True).otherwise(False).alias("inf"),
        when(~b["val1"], coalesce(b["val2"], a["val2"])).otherwise(a["val2"]).alias("mob"),
        when(~b["val1"], coalesce(b["val3"], a["val3"])).otherwise(a["val3"]).alias("cli"),
        a["val3"].alias("fam"),
    )
    .count()
    .withColumnRenamed("count", "unique_val")
)

have you tried :


df = spark.sql("""
SELECT dt,
       id,
       a.val1,
       CASE
           WHEN b.val1 = FALSE THEN TRUE
           ELSE FALSE
       END AS inf,
       CASE
           WHEN b.val1 = FALSE THEN coalesce(b.val2, a.val2)
           ELSE a.val2
       END mob,
       CASE
           WHEN b.val1 = FALSE THEN coalesce(b.val3, a.val3)
           ELSE a.val3
       END cli,
       a.val3 fam,
       count(*) unique_val
FROM c
JOIN a ON (c.e_id = a.e_id)
LEFT OUTER JOIN b ON (c.m_id = b.m_id)
GROUP BY 1,
         2,
         3,
         4,
         5,
         6,
         7
""")
  • Related