I have two columns in a data frame df
in PySpark:
| features | center |
---------- ----------
| [0,1,0] | [1.5,2,1]|
| [5,7,6] | [10,7,7] |
I want to create a function which calculates the Euclidean distance between df['features']
and df['center']
and map it to a new column in df, distance
.
Let's say our function looks like the following:
@udf
def dist(feat, cent):
return np.linalg.norm(feat-cent)
How would I actually apply this to do what I want it to do? I was trying things like
df.withColumn("distance", dist(col("features"), col("center"))).show()
but that gives me the following error:
rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 869.0 failed 4 times, most recent failure: Lost task 0.3 in stage 869.0 (TID 26423) (10.50.91.134 executor 35): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
I am really struggling with understanding how to do basic Python mappings in a Spark context, so I really appreciate any help.
CodePudding user response:
You have truly chosen a difficult topic. In Spark, 95% of things can be done without python UDFs. You should always try to find a way not to create a UDF.
I've attempted your UDF, I got the same error, and I cannot really tell, why. I think there's something with data types, as you pass Spark array into a function which expects numpy data types. I really can't tell much more...
For Euclidian distance, it's possible to calculate it in Spark. Not an easy one, though.
from pyspark.sql import functions as F
df = spark.createDataFrame(
[([0, 1, 0], [1.5, 2., 1.]),
([5, 7, 6], [10., 7., 7.])],
['features', 'center'])
distance = F.aggregate(
F.transform(
F.arrays_zip('features', 'center'),
lambda x: (x['features'] - x['center'])**2
),
F.lit(0.0),
lambda acc, x: acc x,
lambda x: x**.5
)
df = df.withColumn('distance', distance)
df.show()
# --------- ---------------- ------------------
# | features| center| distance|
# --------- ---------------- ------------------
# |[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
# |[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
# --------- ---------------- ------------------
CodePudding user response:
from sklearn.metrics.pairwise import paired_distances
Alter dfs schema to accommodate the dist column
sch= df.withColumn('dist', lit(90.087654623)).schema
Create pandas udf that claculates distance
def euclidean_dist(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf.assign(dist=paired_distances(pdf['features'].to_list(),pdf['center'].to_list()))
df.mapInPandas(euclidean_dist, schema=sch).show()
Solution
--------- ---------------- ------------------
| features| center| dist|
--------- ---------------- ------------------
|[0, 1, 0]| [1.5, 2.0, 1.0]|2.0615528128088303|
|[5, 7, 6]|[10.0, 7.0, 7.0]|5.0990195135927845|
--------- ---------------- ------------------