I've searched and can't find a suitable answer for my Pyspark issue. I'm looking for an alternative approach which is more efficient and doesn't use a UDF.
I have a simple equation in a UDF which has inputs from (a)literal constant, (b)column values, and (c)values from a list (or dict). The output must be created multiple times and stored in an array. Is it possible to do this outside of a UDF?
I've knocked up this simple example, although my actual issue is slightly more complex with more rows, a bigger equation, & loops over 40 times:
NOTE: V3 example question:
from pyspark.sql.functions import *
from pyspark.sql.types import *
test_data = [("A1",10.5), ("A2",40.5), ("A3",60.5)]
schema = StructType([ \
StructField("ID",StringType(),True), \
StructField("num1",DoubleType(),True)])
df = spark.createDataFrame(data=test_data,schema=schema)
const1 = 10
const2 = 20
num_lst1 = [2.1,4.2,6.3,8.4,10.5]
num_lst2 = [20,40,60,80,100]
num_lst3 = [100.1,200.2,300.3,400.4,500.5]
def udf_whatever(num_lst1,num_lst2,num_lst3):
def whatever(const1, const2, val1):
DH = [None for t in range(5)]
for i in range(5):
DH[i] = const1 val1 const2 (num_lst1[i]*num_lst2[i]) num_lst3[i]
return DH
return udf(whatever, ArrayType(DoubleType()))
df2 = df.withColumn("UDF_OUT",udf_whatever(num_lst1,num_lst2,num_lst3)(lit(const1),lit(const2),col("num1")))
df2.show(truncate=False)
--- ---- -------------------------------------
|ID |num1|UDF_OUT |
--- ---- -------------------------------------
|A1 |10.5|[182.6, 408.7, 718.8, 1112.9, 1591.0]|
|A2 |40.5|[212.6, 438.7, 748.8, 1142.9, 1621.0]|
|A3 |60.5|[232.6, 458.7, 768.8, 1162.9, 1641.0]|
--- ---- -------------------------------------
With Emma's help (in comments) I've got this to work but seems a little expensive to create new columns per list, especially with millions of rows. Is there a better way?
df3 = df.withColumn('MAP_LIST1', array(*map(lit, num_lst1)))\
.withColumn('MAP_LIST2', array(*map(lit, num_lst2)))\
.withColumn('MAP_LIST3', array(*map(lit, num_lst3)))\
.withColumn('EQUATION_OUT', expr(f"""transform(MAP_LIST1, (x, i) -> {const1} num1 {const2} (x * MAP_LIST2[i]) MAP_LIST3[i])"""))
df3.show()
Any help much appreciated! Rick
CodePudding user response:
One way to do this is to use array_repeat and transform.
First, use array_repeat
to create the base array with just the num3
values.
Then, use transform
to calculate the value for each num3
value in the array.
For Spark 3.1
repeat = 5
const = 10
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', transform(col('arr'), lambda x, i: lit(const) col('num1') col('num2') i * x)))
For Spark 2.4 < 3.1
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', expr('transform(arr, (x, i) -> 10 num1 num2 i * x)')))
============================================================
Update with the new equation (const col list element)
If there is only 1 array (num_lst
), you can initialize the UDF_OUT
with the array and do transform
to add other variables to the UDF_OUT
.
df = (df.withColumn('UDF_OUT', array(*map(lit, num_lst)))
.withColumn('UDF_OUT', expr(f"""
transform(UDF_OUT, (x, i) -> {const} num1 x)
""")))