Home > front end >  How to efficiently WordCount for each file?
How to efficiently WordCount for each file?

Time:12-16

I have tens of thousands files in dir demotxt like:

demotxt/
    aa.txt
            this is aaa1
            this is aaa2
            this is aaa3
    bb.txt
            this is bbb1
            this is bbb2
            this is bbb3
            this is bbb4
    cc.txt
            this is ccc1
            this is ccc2

I would like to efficiently make a WordCount for each .txt in this dir with Spark2.4 (scala or python)

# target result is:
aa.txt:  (this,3), (is,3), (aaa1,1), (aaa2,1), (aaa3,1) 
bb.txt:  (this,3), (is,3), (bbb1,1), (bbb2,1), (bbb3,1) 
cc.txt:  (this,3), (is,3), (ccc1,1), (ccc2,1), (ccc3,1) 

code maybe like?

def dealWithOneFile(path2File):
  res = wordcountFor(path2File)
  saveResultToDB(res)
sc.wholeTextFile(rooDir).map(dealWithOneFile)

Seems using sc.textFile(".../demotxt/") spark will load all files which may cause memory issues,also it treats all files as one which is not expected.

So I wonder how should I do this? Many Thanks!

CodePudding user response:

Here is an approach. Can be with DF or RDD. Here I show RDD using Databricks, as you do not state. Scala as well.

It's hard to explain, but works. Try some input.

%scala
val paths = Seq("/FileStore/tables/fff_1.txt", "/FileStore/tables/fff_2.txt")
val rdd = spark.read.format("text").load(paths: _*).select(input_file_name, $"value").as[(String, String)].rdd  
val rdd2 = rdd.flatMap(x=>x._2.split("\\s ").map(y => ((x._1, y), 1)))
val rdd3 = rdd2.reduceByKey(_ _).map( { case (x, y) => (x._1, (x._2, y)) } )
rdd3.collect
val rdd4 = rdd3.groupByKey() 
rdd4.collect
  • Related