I have a log file that is fed data about every visit for some websites ( it is actually just a python programm that simaulates this ) , i want to count the number of visits per websites .
So i m trying to use FileTailSource
but it doesnt print anything to the file
Can u help?
Tahnks
`
import akka.actor._
import akka._
import scala.concurrent._
import java.nio.file._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO,Flow, Sink, Source}
import akka.stream.alpakka.file.scaladsl.FileTailSource
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.lang
object LogFileAnalyzer {
def main(args: Array[String]): Unit = {
// Create actor system and materializer
implicit val system = ActorSystem("LogFileAnalyzer")
implicit val materializer = ActorMaterializer()
// Read the log file as a source of lines
val logFile = Paths.get("./src/main/scala/log-generator.log")
val source = FileTailSource(logFile,maxChunkSize = 4096,startingPosition=0L,pollingInterval = 250.millis).via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 4096, allowTruncation = false)).map { line =>
val fields = line.utf8String.split(" ")
val website = fields(2)
(website,1)
}.groupBy(8,_._1).mapConcat()
.reduce { (entry1, entry2) =>
val (website1, visits1) = entry1
val (website2, visits2) = entry2
(website1, visits1 visits2)}
.map { case (website, count) => s"($website, $count)" }
.map(s=>ByteString(s))
// Parse each line of the log file to extract the website name
//val websiteFlow = Flow[ByteString]
val sink = FileIO.toPath(Paths.get("./f.txt"))
// Group the website names and count the number of visits for each website
//case (website, visits) => (website, visits 1)
source.to(sink).run()
}
}
`
when i worked with FileIO.fromPath
the code worked fine but it runs one time and doesnt process new streamed data
i located the problem and i think it has something to do with the groupby or reduce functions
CodePudding user response:
The main problem (besides the fact that your main method does not wait for your stream to complete and thus your program will terminate too early) is:
reduce
only emits a single value when the stream completes (emits when upstream completes), but the stream will never complete because FileTailSource
will keep polling for new lines of the input file forever.
You might want to use scan instead of reduce
(because scan
will emit a new item whenever a new item is received from upstream).