I have a structure along these lines, an invoice table and an invoice lines table. I want to output the lines as a JSON ordered array in a mandated schema, ordered by line number but the line number isn't in the schema (it is assumed to be implicit in the array). As I understand it, both pyspark and json will preserve the array order once created. Please see the rough example below. How can I make sure the invoice lines preserve the line number order. I could do it using list comprehension but this means dropping out of spark which I think would be inefficient.
from pyspark.sql.functions import collect_list, struct
invColumns = StructType([
StructField("invoiceNo",StringType(),True),
StructField("invoiceStuff",StringType(),True)
])
invData = [("1", "stuff"), ("2", "other stuff"), ("3", "more stuff")]
invLines = StructType([
StructField("lineNo",IntegerType(),True),
StructField("invoiceNo",StringType(),True),
StructField("detail",StringType(),True),
StructField("quantity",IntegerType(),True)
])
lineData = [(1,"1","item stuff",3),(2,"1","new item stuff",2),(3,"1","old item stuff",5),(1,"2","item stuff",3),(1,"3","item stuff",3),(2,"3","more item stuff",7)]
invoice_df = spark.createDataFrame(data=invData,schema=invColumns)
#in reality read from a spark table
invLine_df = spark.createDataFrame(data=lineData,schema=invLines)
#in reality read from a spark table
invoicesTemp_df = (invoice_df.select('invoiceNo',
'invoiceStuff')
.join(invLine_df.select('lineNo',
'InvoiceNo',
'detail',
'quantity'
),
on='invoiceNo'))
invoicesOut_df = (invoicesTemp_df.withColumn('invoiceLines',struct('detail','quantity'))
.groupBy('invoiceNo','invoiceStuff').agg(collect_list('invoiceLines').alias('invoiceLines'))
.select('invoiceNo',
'invoiceStuff',
'invoiceLines'
))
display(invoicesOut_df)
3 -- more stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
-- 1: -- {"detail": "more item stuff", "quantity": 7}
1 -- stuff -- array -- 0: -- {"detail": "new item stuff", "quantity": 2}
-- 1: -- {"detail": "old item stuff", "quantity": 5}
-- 2: -- {"detail": "item stuff", "quantity": 3}
2 -- other stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
The following, as requested is input data
Invoice Table
"InvoiceNo", "InvoiceStuff",
"1","stuff",
"2","other stuff",
"3","more stuff"
Invoice Lines Table
"LineNo","InvoiceNo","Detail","Quantity",
1,"1","item stuff",3,
2,"1","new item stuff",2,
3,"1","old item stuff",5,
1,"2","item stuff",3,
1,"3","item stuff",3,
2,"3","more item stuff",7
and an output should look like this, but the arrays should be ordered by the line number from the invoice lines table, even though it isn't in the output.
Output
"1","stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "new item stuff", "quantity": 2},{"detail": "old item stuff", "quantity": 5}]",
"2","other stuff","[{"detail": "item stuff", "quantity": 3}]"
"3","more stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "more item stuff", "quantity": 7}]"
CodePudding user response:
collect_list
does not respect data's order
Note The function is non-deterministic because the order of collected results depends on the order of the rows which may be non-deterministic after a shuffle.
One possible way to do that is applying collect_list
with a window function where you can control the order.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(invoice_df
.join(invLine_df, on='invoiceNo')
.withColumn('invoiceLines', F.struct('lineNo', 'detail','quantity'))
.withColumn('a', F.collect_list('invoiceLines').over(W.partitionBy('invoiceNo').orderBy('lineNo')))
.groupBy('invoiceNo')
.agg(F.max('a').alias('invoiceLines'))
.show(10, False)
)
--------- --------------------------------------------------------------------
|invoiceNo|invoiceLines |
--------- --------------------------------------------------------------------
|1 |[{1, item stuff, 3}, {2, new item stuff, 2}, {3, old item stuff, 5}]|
|2 |[{1, item stuff, 3}] |
|3 |[{1, item stuff, 3}, {2, more item stuff, 7}] |
--------- --------------------------------------------------------------------