Home > Software engineering >  Calculate relative frequency of bigrams in PySpark
Calculate relative frequency of bigrams in PySpark

Time:12-30

I'm trying to count word pairs in a text file. First, I've done some pre-processing on the text, and then I counted word pairs as shown below:

((Aspire, to), 1) ; ((to, inspire), 4) ; ((inspire, before), 38)...

Now, I want to report the 1000 most frequent pairs, sorted by :

  1. Word (second word of the pair)
  2. Relative frequency (pair occurences / 2nd word total occurences)

Here's what I've done so far

from pyspark.sql import SparkSession
import re

spark = SparkSession.builder.appName("Bigram occurences and relative frequencies").master("local[*]").getOrCreate()
sc = spark.sparkContext
text = sc.textFile("big.txt")

tokens = text.map(lambda x: x.lower()).map(lambda x: re.split("[\s,.;:!?] ", x))
pairs = tokens.flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:]))).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x   y)
frame = pairs.toDF(['pair', 'count'])

# Dataframe ordered by the most frequent pair to the least
most_frequent = frame.sort(frame['count'].desc())
# For each row, trying to add a column with the relative frequency, but I'm getting an error
with_rf = frame.withColumn("rf", frame['count'] / (frame.pair._2.sum()))

I think I'm relatively close to the result I want but I can't figure it out. I'm new to Spark and DataFrames in general. I also tried

import pyspark.sql.functions as F
frame.groupBy(frame['pair._2']).agg((F.col('count') / F.sum('count')).alias('rf')).show()

Any help would be appreciated.

EDIT: here's a sample of the frame dataframe

 -------------------- ----- 
|                pair|count|
 -------------------- ----- 
|{project, gutenberg}|   69|
|  {gutenberg, ebook}|   14|
|         {ebook, of}|    5|
|    {adventures, of}|    6|
|           {by, sir}|   12|
|     {conan, doyle)}|    1|
|     {changing, all}|    2|
|         {all, over}|   24|
 -------------------- ----- 

root
 |-- pair: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |-- count: long (nullable = true)

CodePudding user response:

The relative frequency can be computed by using window function, that partitions by the second word in the pair and applies a sum operation.

Then, we limit the entries in the df to the top x, based on count and finally order by the second word in pair and the relative frequency.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

data = [(("project", "gutenberg"), 69,),
        (("gutenberg", "ebook"), 14,),
        (("ebook", "of"), 5,),
        (("adventures", "of"), 6,),
        (("by", "sir"), 12,),
        (("conan", "doyle"), 1,),
        (("changing", "all"), 2,),
        (("all", "over"), 24,), ]

df = spark.createDataFrame(data, ("pair", "count", ))

ws = W.partitionBy(F.col("pair")._2).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

(df.withColumn("relative_freq", F.col("count") / F.sum("count").over(ws))
   .orderBy(F.col("count").desc())
   .limit(3) # change here to select top 1000
   .orderBy(F.desc(F.col("pair")._2), F.col("relative_freq").desc())
).show()

"""
 -------------------- ----- ------------- 
|                pair|count|relative_freq|
 -------------------- ----- ------------- 
|         {all, over}|   24|          1.0|
|{project, gutenberg}|   69|          1.0|
|  {gutenberg, ebook}|   14|          1.0|
 -------------------- ----- ------------- 
"""
  • Related