I am currently working on a Python function.The process is supposed to loop over a pandas dataframe containing my data structure (I get the info of which table contains the value for a field I am looking for) and then loop over a spark dataframe that loads the right table from the precedent loop and if the value for the field is encountered, we add it to a record list and to a dataframe that itself will be returned at the end of the process to be turned into a csv.
df_meta = pd.read_csv("/dbfs/mnt/resources/path/file_meta.csv", sep=';')
liste_t = []
def recursive_process(field, id_p, list_drop):
for row in df_meta.index:
if df_meta['SOURCE_COLUMN_NAME'][row] == field:
df_table = spark.read.table("source1" "." df_meta['SOURCE_TABLE_NAME'][row])
data_collect = df_table.collect()
for row2 in data_collect:
if row2(field) == id_p and row2(field) not in list_drop:
list_drop.append(id_p)
#add field value to final dataframe
return list_drop
In parameters, I gave the field I am targetting, the value id_p
of this field and a list
to record the fields I have already processed.
The problem is :
I don't really know how to process over the spark dataframe containing my data, I read about the collect()
method I tryed to use, but I am not sure it works here.
So far, I wanted my code to edit my empty list and returns it with values that would be added to my final dataframe. But as I call my function :
recursive_process("Col_ID","1003729193",liste_t)
The list just returns nothing which should not be normal ... So I would like to know how to process on the spark dataframe ?and how to return a list/a datarame edited inside of my loop ?(I'm afraid the process on these just happen into my loops but stay unchanges outside these loops).
Thank's for helping !
CodePudding user response:
You can filter the dataframe, with something like this:
df_table.filter(f"{field} = {id_p}").filter(f"{field} NOT IN {list_drop}")
Then it's depends on the size of this filtering:
- (Big) you could save the results on disk for each dataframe (
df.write
methods), and read that back with spark. - (Small) Or you can create a temporary Spark df, and append results to it (
df.union()
if they have the same schema), and write to disk the final state of this temporary df.
If you go in Spark, you should go in Spark all the way (not collecting than itering the rows). If you do not know well the Spark Apis, you could use the pandas one with this import:
import pyspark.pandas as pd