I have a text file with the name of many products separated by comma (','). I want to save to a file the total count of products present in this file. How to do this using pyspark?
I've tried so far the following code, it seems that the count I get is not correct.
import pyspark
import random
if not 'sc' in globals():
sc = pyspark.SparkContext()
text_file = sc.textFile("anytext.txt")
counts = text_file.flatMap(lambda line: line.split(",")) \
.map(lambda word: (word))
counts.count()
Can anyone help me with this? Thanks in advance.
CodePudding user response:
Try using it as below -
Initial data -
df = spark.read.text("/FileStore/word_count/citrus_fruit.txt")
df.show(5, False)
-------------------------------------------------------------------
|value |
-------------------------------------------------------------------
|citrus fruit,semi-finished bread,margarine,ready soups |
|tropical fruit,yogurt,coffee |
|whole milk |
|pip fruit,yogurt,cream cheese ,meat spreads |
|other vegetables,whole milk,condensed milk,long life bakery product|
-------------------------------------------------------------------
Now counting words in each line -
from pyspark.sql import functions as F
df2 = df.withColumn('count', F.size(F.split('value', ',')))
df2.show(5, False)
------------------------------------------------------------------- -----
|value |count|
------------------------------------------------------------------- -----
|citrus fruit,semi-finished bread,margarine,ready soups |4 |
|tropical fruit,yogurt,coffee |3 |
|whole milk |1 |
|pip fruit,yogurt,cream cheese ,meat spreads |4 |
|other vegetables,whole milk,condensed milk,long life bakery product|4 |
------------------------------------------------------------------- -----
Once you have the count, you just have to sum it all up as below -
df3 = df2.groupBy().agg(F.sum('count').alias('word_count_sum')).show(5,False)
df3.show()
--------------
|word_count_sum|
--------------
|43368 |
--------------
Now you could write this dataframe to a file as below (I've used csv here)-
df3.write.format('csv').mode('overwrite').save('/FileStore/tmp/')