I want to write a PySpark snippet of code that first reads some data in the form of a text file from a Cloud Storage bucket. The text file contains paragraphs of text separated by newline characters, words are also separated using space characters.
I need to calculate the 10 highest-frequency words in the given text file.
import pyspark
from pyspark import SparkConf, SparkContext
from google.cloud import storage
import sys
conf = SparkConf().setMaster("local").setAppName("some paragraph")
sc = SparkContext(conf=conf)
bucket_name = (sys.argv[1])
destination_blob_name = (sys.argv[2])
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
downloaded_blob = blob.download_as_string()
print(downloaded_blob)
print(blob)
def words():
alist = {}
for line in blob:
fields = line.split("|")
print(fields)
movies[int(fields[0])]=fields[1]
return movies
myline = words()
myRDD = sc.parallelize([myline])
print(myRDD.collect())
CodePudding user response:
If you had a file like this:
s = """a b c d e f g h i j
a a b c d e f g h i j k l m n o p"""
with open('file.txt', 'w') as f:
f.write(s)
You could get top 10 words with counts into a Python dictionary like this:
from pyspark.sql import functions as F
split_on_spaces = F.split('value', ' ')
df = (
spark.read.text('file.txt')
.withColumn('value', F.explode(split_on_spaces))
.groupBy('value').count()
.orderBy(F.desc('count'))
)
top_val_dict = {r['value']:r['count'] for r in df.head(10)}
print(top_val_dict)
# {'a': 3, 'g': 2, 'e': 2, 'f': 2, 'i': 2, 'h': 2, 'd': 2, 'c': 2, 'j': 2, 'b': 2}
This only assumes your case you described where words are separated with spaces. In the real world scenario, you will probably need to deal with punctuation, potential removal of non-words after punctuation removal, etc. That is up to you to further tweak the algorithm.
CodePudding user response:
if you want to use RDD transformations, you can use collections.Counter()
to create frequencies and then sort it based on the counts.
here's an example to get top 3
from collections import Counter
data_rdd = spark.sparkContext.textFile('file.txt')
data_rdd.collect()
# ['a b c b a d h t t a w b c c c']
data_rdd. \
map(lambda x: x.split(' ')). \
map(lambda e: sorted(Counter(e).items(), key=lambda k: k[1], reverse=True)). \
flatMap(lambda e: e[:3]). \
collect()
# [('c', 4), ('a', 3), ('b', 3)]