Home > front end >  Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding wo
Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding wo

Time:01-16

I know there are similar threads but I wasn't able to solve my error with those solutions.

Here is my schema:

root
 |-- embeddings: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- id: long (nullable = true)

I am trying to group by id and perform pairwise cosine similarity across the all of the embeddings within that id

Here is my code end to end:

import pyspark.sql.types as T
import pyspark.sql.functions as F

embeddings=spark.read.parquet('directory')

schema = T.StructType([T.StructField("shop_id", T.LongType(), True),
                       T.StructField("cosine_similarities", T.ArrayType(T.ArrayType(T.DoubleType(), True), True))
                      ])


@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(df):
    
    single_col = df.select(F.col('embeddings'))

    single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
    
    cosine_sim = cosine_similarity(single_col_flatmap)

    return cosine_sim

embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(1)

This raises the following error:

AttributeError: 'NoneType' object has no attribute '_jvm'

Now, to debug this, I ran the code within the function on a single id and didn't run into issues.

single_col = embeddings.filter("id =1").select(F.col('embeddings'))

single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()

cosine_sim = cosine_similarity(single_col_flatmap)

Any help would be much appreciated.

CodePudding user response:

You get that error because you're trying to use Pyspark dataframe within the pandas_udf. However, the parameter df that is passed to cosine_sim_udf is a pandas dataframe. Please see the docs:

Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame

You need to change your code for something like this:

import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import pyspark.sql.functions as F

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(pdf):
    em = pdf["embeddings"].values.tolist()
    cosine_sim = pd.DataFrame({'cosine_similarities': [cosine_similarity(em).tolist()]})

    cosine_sim["shop_id"] = pdf["shop_id"]

    return cosine_sim

Example:

embeddings = spark.createDataFrame([
    (1, [1., 0., 3., 5.]), (1, [6., 7., 8., 5.]),
    (2, [1.3, 4.4, 2.1, 3.9]), (2, [9., 5., 0., 3.]),
    (3, [1.3, 4.4, 2.1, 3.9]), (3, [9., 5., 0., 3.])
], ["shop_id", "embeddings"])

embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(truncate=False)

# ------- --------------------------------------------------------------------- 
#|shop_id|cosine_similarities                                                  |
# ------- --------------------------------------------------------------------- 
#|1      |[[1.0, 0.704780765735282], [0.704780765735282, 0.9999999999999999]]  |
#|2      |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#|3      |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
# ------- --------------------------------------------------------------------- 
  •  Tags:  
  • Related