Home > Back-end >  Pyspark For Loop Not Creating Dataframes
Pyspark For Loop Not Creating Dataframes

Time:11-17

I have an initial dataframe df that looks like this:

 ------- --- ----- ------------------ ---- ------------------- 
|gender| pro|share|        prediction|week|     forecast_units|
 ------ ---- ----- ------------------ ---- ------------------- 
|  Male|Polo| 0.01| 258.4054260253906|  37|             1809.0|
|  Male|Polo|  0.1| 332.4026794433594|  38|             2327.0|
|  Male|Polo| 0.15|425.97430419921875|  39|             2982.0|
|  Male|Polo|  0.2| 508.3385314941406|  40|             3558.0|
....

I have the following code that attempts to create multiple dataframes from the original dataframe by applying some calculus. Initial I create four empty dataframes and then I want to loop through four different weeks, c_weeks, and save the result from the calculus to each dataframe on the list_dfs:

schema = StructType([\
    StructField("gender", StringType(),True), \
    StructField("pro",StringType(),True), \
    StructField("units_1_tpr",DoubleType(),True), \
    StructField("units_1'_tpr",DoubleType(),True), \
    StructField("units_15_tpr",DoubleType(),True), \
    StructField("units_20_tpr",DoubleType(),True)])

df_wk1 = spark.createDataFrame([],schema=schema)
df_wk2 = spark.createDataFrame([],schema=schema)
df_wk3 = spark.createDataFrame([],schema=schema)
df_wk4 = spark.createDataFrame([],schema=schema)

list_dfs = [df_wk1, df_wk2, df_wk3, df_wk4]
c_weeks = [37, 38, 39, 40]

for data,weeknum in zip(list_dfs, campaign_weeks):
    data = df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units'))

In the end, the dataframes continue empty. How do fix this? If this way is not possible how can I implement what I want?

CodePudding user response:

If you assign the result of df.filter(... to data it will be lost (actually, that line has no effect). Try this way:

df_wk1, df_wk2, df_wk3, df_wk4 = [
    df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units'))
    for weeknum in [37, 38, 39, 40]
]

However, df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units')) create a DataFrame with a different schema from the one you probably want (looking at your question).

This is an example of the DataFrame you get:

 ------ ---- ------ 
|gender| pro|   0.0|
 ------ ---- ------ 
|  Male|Polo|3558.0|
 ------ ---- ------ 

and this is its schema:

root
 |-- gender: string (nullable = true)
 |-- pro: string (nullable = true)
 |-- 0.0: double (nullable = true)
  • Related