I know there are similar threads but I wasn't able to solve my error with those solutions.
Here is my schema:
root
|-- embeddings: array (nullable = true)
| |-- element: float (containsNull = true)
|-- id: long (nullable = true)
I am trying to group by id and perform pairwise cosine similarity across the all of the embeddings
within that id
Here is my code end to end:
import pyspark.sql.types as T
import pyspark.sql.functions as F
embeddings=spark.read.parquet('directory')
schema = T.StructType([T.StructField("shop_id", T.LongType(), True),
T.StructField("cosine_similarities", T.ArrayType(T.ArrayType(T.DoubleType(), True), True))
])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(df):
single_col = df.select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
return cosine_sim
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(1)
This raises the following error:
AttributeError: 'NoneType' object has no attribute '_jvm'
Now, to debug this, I ran the code within the function on a single id
and didn't run into issues.
single_col = embeddings.filter("id =1").select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
Any help would be much appreciated.
CodePudding user response:
You get that error because you're trying to use Pyspark dataframe within the pandas_udf
. However, the parameter df
that is passed to cosine_sim_udf
is a pandas dataframe. Please see the docs:
Grouped map operations with Pandas instances are supported by
DataFrame.groupby().applyInPandas()
which requires a Python function that takes apandas.DataFrame
and return anotherpandas.DataFrame
You need to change your code for something like this:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import pyspark.sql.functions as F
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(pdf):
em = pdf["embeddings"].values.tolist()
cosine_sim = pd.DataFrame({'cosine_similarities': [cosine_similarity(em).tolist()]})
cosine_sim["shop_id"] = pdf["shop_id"]
return cosine_sim
Example:
embeddings = spark.createDataFrame([
(1, [1., 0., 3., 5.]), (1, [6., 7., 8., 5.]),
(2, [1.3, 4.4, 2.1, 3.9]), (2, [9., 5., 0., 3.]),
(3, [1.3, 4.4, 2.1, 3.9]), (3, [9., 5., 0., 3.])
], ["shop_id", "embeddings"])
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(truncate=False)
# ------- ---------------------------------------------------------------------
#|shop_id|cosine_similarities |
# ------- ---------------------------------------------------------------------
#|1 |[[1.0, 0.704780765735282], [0.704780765735282, 0.9999999999999999]] |
#|2 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#|3 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
# ------- ---------------------------------------------------------------------