Home > Enterprise >  Unpivot PySpark dataframe using arguments
Unpivot PySpark dataframe using arguments

Time:10-25

I'm trying to make a function which would unpivot PySpark dataframe using lists as arguments.

E.g. here is the code with two lists:

1 - ignored_columns_list for ignored (not used) columns
2 - non_pivot_column_list - columns that I don't want to unpivot.

All the rest columns are being unpivoted.

The function:

import pyspark.sql.functions as F

ignored_columns_list = ['column_name1'] # columns that I don't need
non_pivot_column_list = ['column_name2'] # columns I don't want to unpivot
    
def unpivot_columns_final(kpi_rf_df,ignored_columns_list,non_pivot_column_list): 
    
    ignored_columns_df = kpi_rf_df.drop(*ignored_columns_list)  # columns that I don't need
    non_pivot_column_df  = kpi_rf_df.select(*non_pivot_column_list)  # columns I don't want to unpivot
    unpivot_columns_df = kpi_rf_df.drop(*ignored_columns_list,*non_pivot_column_list) # Columns that I want to unpivot 
    
    unpivot_columns_df_count = len(unpivot_columns_df.columns) # count columns to use inside Expr function
    
    unpivot_df = kpi_rf_df.select(*ignored_columns_df.columns,F.expr(f"stack({unpivot_columns_df_count}, {', '.join(unpivot_columns_df.columns)}) as (value)"))
    unpivot_df = unpivot_df.select(*non_pivot_column_df.columns, F.expr(f"stack({unpivot_columns_df_count}, {str(unpivot_columns_df.columns)[1:-1]}) as (kpi)"),'value')
    
    return unpivot_df

The problem is that sometimes when I change columns in the list I get this error:

AnalysisException: cannot resolve 'stack(6, column_name1, column_name2, column_name3, column_name4, column_name5, column_name6)' due to data type mismatch: Argument 1 (double) != Argument 6 (date); line 1 pos 0;

I tried to sort the list of columns in expr, but it doesn't help.

CodePudding user response:

For the sake of curiosity you may take a look at this answer. There's a streamlined approach to a problem very similar to yours. Only dropping some columns could be added to exactly match your case.

Your function works! But you must know that you cannot stack together columns containing different data types. We can see, now you are trying to stack together a column of type double and a column of type date. Spark doesn't know which data type should be assigned to the resultant column, so you get this error.

  • Related