I have tens of thousands files in dir demotxt
like:
demotxt/
aa.txt
this is aaa1
this is aaa2
this is aaa3
bb.txt
this is bbb1
this is bbb2
this is bbb3
this is bbb4
cc.txt
this is ccc1
this is ccc2
I would like to efficiently make a WordCount
for each .txt
in this dir with Spark2.4 (scala or python)
# target result is:
aa.txt: (this,3), (is,3), (aaa1,1), (aaa2,1), (aaa3,1)
bb.txt: (this,3), (is,3), (bbb1,1), (bbb2,1), (bbb3,1)
cc.txt: (this,3), (is,3), (ccc1,1), (ccc2,1), (ccc3,1)
code maybe like?
def dealWithOneFile(path2File):
res = wordcountFor(path2File)
saveResultToDB(res)
sc.wholeTextFile(rooDir).map(dealWithOneFile)
Seems using sc.textFile(".../demotxt/")
spark will load all files which may cause memory issues,also it treats all files as one which is not expected.
So I wonder how should I do this? Many Thanks!
CodePudding user response:
Here is an approach. Can be with DF or RDD. Here I show RDD using Databricks, as you do not state. Scala as well.
It's hard to explain, but works. Try some input.
%scala
val paths = Seq("/FileStore/tables/fff_1.txt", "/FileStore/tables/fff_2.txt")
val rdd = spark.read.format("text").load(paths: _*).select(input_file_name, $"value").as[(String, String)].rdd
val rdd2 = rdd.flatMap(x=>x._2.split("\\s ").map(y => ((x._1, y), 1)))
val rdd3 = rdd2.reduceByKey(_ _).map( { case (x, y) => (x._1, (x._2, y)) } )
rdd3.collect
val rdd4 = rdd3.groupByKey()
rdd4.collect