Home > database >  Add a column from a function of 2 other columns in PySpark
Add a column from a function of 2 other columns in PySpark

Time:06-29

I have two columns in a data frame df in PySpark:

| features |  center  |
 ---------- ---------- 
| [0,1,0]  | [1.5,2,1]|
| [5,7,6]  | [10,7,7] |

I want to create a function which calculates the Euclidean distance between df['features'] and df['center'] and map it to a new column in df, distance.

Let's say our function looks like the following:

@udf
def dist(feat, cent):
    return np.linalg.norm(feat-cent)

How would I actually apply this to do what I want it to do? I was trying things like

df.withColumn("distance", dist(col("features"), col("center"))).show()

but that gives me the following error:

rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 869.0 failed 4 times, most recent failure: Lost task 0.3 in stage 869.0 (TID 26423) (10.50.91.134 executor 35): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

I am really struggling with understanding how to do basic Python mappings in a Spark context, so I really appreciate any help.

CodePudding user response:

You have truly chosen a difficult topic. In Spark, 95% of things can be done without python UDFs. You should always try to find a way not to create a UDF.

I've attempted your UDF, I got the same error, and I cannot really tell, why. I think there's something with data types, as you pass Spark array into a function which expects numpy data types. I really can't tell much more...

For Euclidian distance, it's possible to calculate it in Spark. Not an easy one, though.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [([0, 1, 0], [1.5, 2., 1.]),
     ([5, 7, 6], [10., 7., 7.])],
    ['features', 'center'])

distance = F.aggregate(
    F.transform(
        F.arrays_zip('features', 'center'),
        lambda x: (x['features'] - x['center'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc   x,
    lambda x: x**.5
)
df = df.withColumn('distance', distance)

df.show()
#  --------- ---------------- ------------------ 
# | features|          center|          distance|
#  --------- ---------------- ------------------ 
# |[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
# |[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
#  --------- ---------------- ------------------ 

CodePudding user response:

from sklearn.metrics.pairwise import paired_distances

Alter dfs schema to accommodate the dist column

sch= df.withColumn('dist', lit(90.087654623)).schema

Create pandas udf that claculates distance

def euclidean_dist(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
           
      yield pdf.assign(dist=paired_distances(pdf['features'].to_list(),pdf['center'].to_list()))

df.mapInPandas(euclidean_dist, schema=sch).show()

Solution

 --------- ---------------- ------------------ 
| features|          center|              dist|
 --------- ---------------- ------------------ 
|[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
|[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
 --------- ---------------- ------------------ 
  • Related