I'm trying to count word pairs in a text file. First, I've done some pre-processing on the text, and then I counted word pairs as shown below:
((Aspire, to), 1) ; ((to, inspire), 4) ; ((inspire, before), 38)...
Now, I want to report the 1000 most frequent pairs, sorted by :
- Word (second word of the pair)
- Relative frequency (pair occurences / 2nd word total occurences)
Here's what I've done so far
from pyspark.sql import SparkSession
import re
spark = SparkSession.builder.appName("Bigram occurences and relative frequencies").master("local[*]").getOrCreate()
sc = spark.sparkContext
text = sc.textFile("big.txt")
tokens = text.map(lambda x: x.lower()).map(lambda x: re.split("[\s,.;:!?] ", x))
pairs = tokens.flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:]))).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x y)
frame = pairs.toDF(['pair', 'count'])
# Dataframe ordered by the most frequent pair to the least
most_frequent = frame.sort(frame['count'].desc())
# For each row, trying to add a column with the relative frequency, but I'm getting an error
with_rf = frame.withColumn("rf", frame['count'] / (frame.pair._2.sum()))
I think I'm relatively close to the result I want but I can't figure it out. I'm new to Spark and DataFrames in general. I also tried
import pyspark.sql.functions as F
frame.groupBy(frame['pair._2']).agg((F.col('count') / F.sum('count')).alias('rf')).show()
Any help would be appreciated.
EDIT: here's a sample of the frame
dataframe
-------------------- -----
| pair|count|
-------------------- -----
|{project, gutenberg}| 69|
| {gutenberg, ebook}| 14|
| {ebook, of}| 5|
| {adventures, of}| 6|
| {by, sir}| 12|
| {conan, doyle)}| 1|
| {changing, all}| 2|
| {all, over}| 24|
-------------------- -----
root
|-- pair: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: string (nullable = true)
|-- count: long (nullable = true)
CodePudding user response:
The relative frequency
can be computed by using window
function, that partitions by the second word in the pair
and applies a sum
operation.
Then, we limit the entries in the df to the top x, based on count
and finally order by the second word in pair and the relative frequency.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [(("project", "gutenberg"), 69,),
(("gutenberg", "ebook"), 14,),
(("ebook", "of"), 5,),
(("adventures", "of"), 6,),
(("by", "sir"), 12,),
(("conan", "doyle"), 1,),
(("changing", "all"), 2,),
(("all", "over"), 24,), ]
df = spark.createDataFrame(data, ("pair", "count", ))
ws = W.partitionBy(F.col("pair")._2).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
(df.withColumn("relative_freq", F.col("count") / F.sum("count").over(ws))
.orderBy(F.col("count").desc())
.limit(3) # change here to select top 1000
.orderBy(F.desc(F.col("pair")._2), F.col("relative_freq").desc())
).show()
"""
-------------------- ----- -------------
| pair|count|relative_freq|
-------------------- ----- -------------
| {all, over}| 24| 1.0|
|{project, gutenberg}| 69| 1.0|
| {gutenberg, ebook}| 14| 1.0|
-------------------- ----- -------------
"""