Hi I am new to Scala and I am using intellij Idea. I am trying to filestream a text file running Scala cluster on Hadoop-Spark. My main goal is to count only words (without any special characters) in a key, value format.
I found apache-spark regex extract words from rdd article where they use findAllIn() function with regex but not sure if I am using it correctly in my code.
When I built my project and generate the jar file to run it in Spark I manually provide the text file and it seems that it runs and count words but it also seems that it enters in a loop as it processed the file infinitely. I thought it should process it only once.
Can someone tell me why this may be happening? or is it a better way to achieve my goal?
Part of my text is:
It wlll only take a day,' he said. The others disagreed.
It's too fragile," 289 they said disapprovingly 23 age, but he refused to listen. Not
quite so lazy, the second little pig went in search of planks of seasoned 12 hola, 1256 23.
My code is:
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val sep_words = lines.flatMap("[a-zA-Z] ".r.findAllIn(_))
val wordCounts = sep_words.map(x => (x, 1)).reduceByKey(_ _)
wordCounts.saveAsTextFiles("hdfs:///user/test/task1.txt")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
My expected output is something like below but as mentioned before it keeps repeating:
22/10/24 05:11:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
22/10/24 05:11:40 INFO Executor: Finished task 0.0 in stage 27.0 (TID 20). 1529 bytes result sent to driver
22/10/24 05:11:40 INFO TaskSetManager: Finished task 0.0 in stage 27.0 (TID 20) in 18 ms on localhost (executor driver) (1/1)
22/10/24 05:11:40 INFO TaskSchedulerImpl: Removed TaskSet 27.0, whose tasks have all completed, from pool
22/10/24 05:11:40 INFO DAGScheduler: ResultStage 27 (print at NetworkWordCount.scala:28) finished in 0.031 s
22/10/24 05:11:40 INFO DAGScheduler: Job 13 finished: print at NetworkWordCount.scala:28, took 0.036092 s
-------------------------------------------
Time: 1666588300000 ms
-------------------------------------------
(heaped,1)
(safe,1)
(became,1)
(For,1)
(ll,1)
(it,1)
(Let,1)
(Open,1)
(others,1)
(pack,1)
...
22/10/24 05:11:40 INFO JobScheduler: Finished job streaming job 1666588300000 ms.1 from job set of time 1666588300000 ms
22/10/24 05:11:40 INFO JobScheduler: Total delay: 0.289 s for time 1666588300000 ms (execution: 0.222 s)
22/10/24 05:11:40 INFO ShuffledRDD: Removing RDD 40 from persistence list
22/10/24 05:11:40 INFO BlockManager: Removing RDD 40
22/10/24 05:11:40 INFO MapPartitionsRDD: Removing RDD 39 from persistence list
22/10/24 05:11:40 INFO BlockManager: Removing RDD 39
Additionally, I notice that when I only use split() function without any regex neither findAllIn() function it process the file only once as expected, but of course it only splits the text by spaces.
Something like this:
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ _)
I almost forgot if you can explain also the meaning of the usage of underscores in code will help me a lot. I am having some problems to understand that part too.
Thanks in advance.
CodePudding user response:
If your file doesn't change while you're parsing it, you won't need to use SparkStreamingContext.
Just create SparkContext:
val sc = new SparkContext(new SparkConf().setAppName("HdfsWordCount").setMaster("local"))
and then process you file in the way you need. You will need SparkStreamingContext if you have some datasources which changing and you need to process changed delta continiously
There're many different ways to use underscore in scala, but in your current case underscore is a just syntax sugar reduces syntax of lambda function. It actully means element of collection.
You can rewrite code with undescore like this:
//both lines doing the same
.flatMap("[a-zA-Z] ".r.findAllIn(_))
.flatMap((s: String) => "[a-zA-Z] ".r.findAllIn(s))
And
//both lines doing the same
.map(x => (x, 1)).reduceByKey(_ _)
.map(x => (x, 1)).reduceByKey((l, r) => l r)
For better understanding this case and other usages read some articles. Like this or this
CodePudding user response:
Well, I think that I found what was my main error. I was telling spark to monitor the same directory where I was creating my output file. So, my thought is that it was self-triggering a new event everytime it saved the output processed data. Once I created an input and output directory the repeating output stopped and worked as expected. It only generates a new output until I provide a new file to the server manually.