Home > database >  Average of array column of two dataframes and find the maximum index in pyspark
Average of array column of two dataframes and find the maximum index in pyspark

Time:11-20

I want to Combine the column values of two dataframe after performing some operations to create a new dataframe in pyspark. The columns of each dataframe are vectors with integer values. The operations done are taking the average of each values in the vectors of the dataframe and finding the index of the maximum element of the new vectors created.

Dataframe1:

       |id| |value1 |
       |:.| |:......|
       | 0| |[0,1,2]|
       | 1| |[3,4,5]|

Dataframe2:

        |id| |value2 |
        |:.| |:......|
        | 0| |[1,2,3]|
        | 1| |[4,5,6]| 
         
         
         

Dataframe3:

         |value3       |
         |:............|
         |[0.5,1.5,2.5]|
         |[3.5,4.5,5.5]|

Dataframe4:

         |value4|
         |:.....|
         |2     |
         |2     |

Dataframe3 is obtained by taking the average of each elements of each vectors of dataframe 1 and 2 i.e.: first vector of dataframe3 [0.5,1.5,2.5] is obtained by [0 1/2,1 2/2,2 3/2]. Dataframe4 is obtained by taking the index of maximum value of each vector.i.e; Take first vector of dataframe3[0.5,1.5,2.5] maximum value is 2.5 and it occurs at index 2 so first element in Dataframe4 is 2. How we can implement this in pyspark .

CodePudding user response:

First, I recreate your test data :

a = [
    [0, [0,1,2]],
    [1, [3,4,5]],
]
b = ["id", "value1"]
df1 = spark.createDataFrame(a,b)

c = [
    [0, [1,2,3]],
    [1, [4,5,6]],
]
d = ["id", "value2"]
df2 = spark.createDataFrame(c,d)

then, I process the data :

  1. join
df3 = df1.join(df2, on="id")

df3.show()
 --- --------- ---------                                                        
| id|   value1|   value2|
 --- --------- --------- 
|  0|[0, 1, 2]|[1, 2, 3]|
|  1|[3, 4, 5]|[4, 5, 6]|
 --- --------- --------- 
  1. create the average array
from pyspark.sql import functions as F, types as T

@F.udf(T.ArrayType(T.FloatType()))
def avg_array(array1, array2):
    return list(map(lambda x: (x[0]   x[1]) / 2, zip(array1, array2)))

df3 = df3.withColumn("value3", avg_array(F.col("value1"), F.col("value2")))

# OR without UDF 

df3 = df3.withColumn(
    "value3",
    F.expr("transform(arrays_zip(value1, value2), x -> (x.value1   x.value2) / 2)"),
)

df3.show()
 --- --------- --------- ---------------                                        
| id|   value1|   value2|         value3|
 --- --------- --------- --------------- 
|  0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|
|  1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|
 --- --------- --------- --------------- 
  1. get the index (the array_position start at 1, you can do a -1 if necessary)
df4 = df3.withColumn("value4",F.expr("array_position(value3, array_max(value3))"))

df4.show()
 --- --------- --------- --------------- ------                                 
| id|   value1|   value2|         value3|value4|
 --- --------- --------- --------------- ------ 
|  0|[0, 1, 2]|[1, 2, 3]|[0.5, 1.5, 2.5]|     3|
|  1|[3, 4, 5]|[4, 5, 6]|[3.5, 4.5, 5.5]|     3|
 --- --------- --------- --------------- ------ 
  • Related