Home > Enterprise >  converting sql to dataframe api
converting sql to dataframe api

Time:09-17

How can the below sql be converted in spark? I attempted to do the below but saw this error -

Error evaluating method : '$eq$eq$eq': Method threw 'java.lang.RuntimeException' exception.

I am also not sure how to represent where sp1.cart_id = sp.cart_id in spark query

  select distinct 
          o.order_id
        , 'PENDING'
        from shopping sp
        inner join order o
            on o.cart_id = sp.cart_id
        where o.order_date = (select max(sp1.order_date)
                                    from shopping sp1
                                    where sp1.cart_id = sp.cart_id)
SHOPPING_DF
  .select(
    "ORDER_ID",
    "PENDING")
  .join(ORDER_DF, Seq("CART_ID"), "inner")
  .filter(col(ORDER_DATE) === SHOPPING_DF.groupBy("CART_ID").agg(max("ORDER_DATE")))```

CodePudding user response:

If this query is rewritten as a simple join on a table shopping that uses the window function max to determine the order date for each cart_id, this could easily be rewritten as sql as

SELECT DISTINCT
    o.order_id,
    'PENDING'
FROM
    order o
INNER JOIN (
    SELECT
        cart_id,
        MAX(order_date) OVER (
            PARTITION BY cart_id
        ) as order_date
    FROM
        shopping
 ) sp ON sp.cart_id = o.cart_id AND
         sp.order_date = o.order_date

This may be run on your spark session to achieve the results.

Converting this to the spark api could be written as

ORDER_DF.alias('o')
        .join(
            SHOPPING_DF.selectExpr(
                "cart_id",
                "MAX(order_date) OVER (PARTITION BY cart_id) as order_date"
            ).alias("sp"),
            Seq("cart_id","order_date"),
            "inner"
        )
        .selectExpr(
            "o.order_id",
            "'PENDING' as PENDING"
        ).distinct()

Let me know if this works for you.

  • Related