I have a folder that contains n
number of files.
I am creating an RDD that contains all the filenames of above folder with the code below:
fnameRDD = spark.read.text(filepath).select(input_file_name()).distinct().rdd)
I want to iterate through these RDD
elements and process following steps:
- Read content of each element (each element is a filepath, so need to read content throgh SparkContext)
- Above content should be another RDD which I want to pass as an argument to a Function
- Perform certain steps on the RDD passed as argument inside called function
I already have a Function written which has steps that I've tested for Single file and it works fine But I've tried various things syntactically to do first 2 steps, but I am just getting invalid syntax every time.
I know I am not supposed to use map()
since I want to read a file in each iteration which will require sc
, but map
will be executed inside worker node where sc
can't be referenced.
Also, I know I can use wholeTextFiles()
as an alternative, but that means I'll be having text of all the files in memory throughout the process, which doesn't seems efficient to me.
I am open to suggestions for different approaches as well.
CodePudding user response:
There are possibly other, more efficient ways to do it but assuming you already have a function SomeFunction(df: DataFrame[value: string])
, the easiest would be to use toLocalIterator()
on your fnameRDD
to process one file at a time. For example:
for x in fnameRDD.toLocalIterator():
fileContent = spark.read.text(x[0])
# fileContent.show(truncate=False)
SomeFunction(fileContent)
CodePudding user response:
I believe you're looking for recursive file lookup,
spark.read.option("recursiveFileLookup", "true").text(filepathroot)
if you point this to the root directory of your files, spark will traverse the directory and pick up all the files that sit under the root and child folders, this will read the file into a single dataframe