Home > other >  Calculating cosine similarity in Pyspark Dataframe
Calculating cosine similarity in Pyspark Dataframe

Time:11-07

My dataset looks like this

rows = [("user1",1,2,10), 
        ("user2",2,6,27), 
        ("user3",3,3,21), 
        ("user4",4,7,44) 
      ]
columns = ["id","val1","val2","val3"]

df = spark.createDataFrame(data=rows, schema = columns)

enter image description here

And I want to calculate cosine similarity scores for each user like below,

enter image description here

Kindly help in solving this.

Ps: I do not want to use sklearn library as I am dealing with big data.

CodePudding user response:

You can use a UDF function and a pivot:

import numpy as np

from pyspark.sql import SparkSession, functions as F, types as T

@F.udf(T.DoubleType())
def cos_sim(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

# Columns representing the values of the vectors
vector_columns = [c for c in df.columns if c.startswith('val')]

df2 = (
    df.alias('a')
    .crossJoin(df.alias('b'))
    .withColumn(
        'cs',
        cos_sim(
            F.array(*[F.col(f'a.{c}') for c in vector_columns]),
            F.array(*[F.col(f'b.{c}') for c in vector_columns]),
        )
    )
    .groupby('a.id')
    .pivot('b.id')
    .sum('cs')
)

The result is:

 ----- ------------------ ------------------ ------------------ ------------------ 
|   id|             user1|             user2|             user3|             user4|
 ----- ------------------ ------------------ ------------------ ------------------ 
|user1|1.0000000000000002|0.9994487303346109|0.9975694083904585|0.9991881714548081|
|user2|0.9994487303346109|               1.0|0.9947592087399117|0.9980077882931742|
|user3|0.9975694083904585|0.9947592087399117|               1.0|0.9985781309458447|
|user4|0.9991881714548081|0.9980077882931742|0.9985781309458447|               1.0|
 ----- ------------------ ------------------ ------------------ ------------------ 

Clearly, you can use a plain pyspark implementation. It depends on the quantity of data you have to process. Usually UDFs are slower, but simpler and faster to play with, especially when you want to try some quick thing.

Here a possible plain pyspark implementation:

from functools import reduce
from operator import add


def mynorm(*cols):
    return F.sqrt(reduce(add, [F.pow(c, 2) for c in cols]))    
    
def mydot(a_cols, b_cols):
    return reduce(add, [a * b for a, b in zip(a_cols, b_cols)])


def cos_sim_ps(a_cols, b_cols):
    return mydot(a_cols, b_cols) / (mynorm(*a_cols) * mynorm(*b_cols))
    

df3 = (
    df.alias('a')
    .crossJoin(df.alias('b'))
    .withColumn(
        'cs',
        cos_sim_ps(
            [F.col(f'a.{c}') for c in vector_columns],
            [F.col(f'b.{c}') for c in vector_columns],
        )
    )
    .groupby('a.id')
    .pivot('b.id')
    .sum('cs')
)

CodePudding user response:

There is another way to do it for large amount of data. We had a requirement to compute cosine similarity for large number of entities and we compared multiple solutions. The UDF based solution was very slow as lot of data has to be broadcasted for each pairwise computation.

Following is another version of this implementation which scaled very well for us:

data_cols = [c for c in df.columns if c != "id"]
df_cos_sim = df.withColumn("others", F.array(*data_cols)).drop(*data_cols)
df_cos_sim = df_cos_sim.withColumnRenamed("others", "this").crossJoin(df_cos_sim)
df_cos_sim = df_cos_sim.withColumn("dot_prod", F.lit(sum([F.col("this")[i] * F.col("others")[i] for i in range(len(data_cols))])))
df_cos_sim = df_cos_sim.withColumn("norm_1", F.lit(F.sqrt(sum([F.col("this")[i] * F.col("this")[i] for i in range(len(data_cols))]))))
df_cos_sim = df_cos_sim.withColumn("norm_2", F.lit(F.sqrt(sum([F.col("others")[i] * F.col("others")[i] for i in range(len(data_cols))]))))
df_cos_sim = df_cos_sim.withColumn("cosine_similarity", F.lit(F.col("dot_prod") / (F.col("norm_1") * F.col("norm_2"))))
df_cos_sim = df_cos_sim.drop("this", "others", "dot_prod", "norm_1", "norm_2")

Output is pairwise cosine similarity:

 ----- ----- ------------------ 
|   id|   id| cosine_similarity|
 ----- ----- ------------------ 
|user1|user1|1.0000000000000002|
|user1|user2|0.9994487303346109|
|user1|user3|0.9975694083904585|
|user1|user4|0.9991881714548081|
|user2|user1|0.9994487303346109|
|user2|user2|               1.0|
|user2|user3|0.9947592087399117|
|user2|user4|0.9980077882931742|
|user3|user1|0.9975694083904585|
|user3|user2|0.9947592087399117|
|user3|user3|               1.0|
|user3|user4|0.9985781309458447|
|user4|user1|0.9991881714548081|
|user4|user2|0.9980077882931742|
|user4|user3|0.9985781309458447|
|user4|user4|               1.0|
 ----- ----- ------------------ 
  • Related