I'm trying to make a function which would unpivot PySpark dataframe using lists as arguments.
E.g. here is the code with two lists:
1 - ignored_columns_list
for ignored (not used) columns
2 - non_pivot_column_list
- columns that I don't want to unpivot.
All the rest columns are being unpivoted.
The function:
import pyspark.sql.functions as F
ignored_columns_list = ['column_name1'] # columns that I don't need
non_pivot_column_list = ['column_name2'] # columns I don't want to unpivot
def unpivot_columns_final(kpi_rf_df,ignored_columns_list,non_pivot_column_list):
ignored_columns_df = kpi_rf_df.drop(*ignored_columns_list) # columns that I don't need
non_pivot_column_df = kpi_rf_df.select(*non_pivot_column_list) # columns I don't want to unpivot
unpivot_columns_df = kpi_rf_df.drop(*ignored_columns_list,*non_pivot_column_list) # Columns that I want to unpivot
unpivot_columns_df_count = len(unpivot_columns_df.columns) # count columns to use inside Expr function
unpivot_df = kpi_rf_df.select(*ignored_columns_df.columns,F.expr(f"stack({unpivot_columns_df_count}, {', '.join(unpivot_columns_df.columns)}) as (value)"))
unpivot_df = unpivot_df.select(*non_pivot_column_df.columns, F.expr(f"stack({unpivot_columns_df_count}, {str(unpivot_columns_df.columns)[1:-1]}) as (kpi)"),'value')
return unpivot_df
The problem is that sometimes when I change columns in the list I get this error:
AnalysisException: cannot resolve 'stack(6, column_name1, column_name2, column_name3, column_name4, column_name5, column_name6)' due to data type mismatch: Argument 1 (double) != Argument 6 (date); line 1 pos 0;
I tried to sort the list of columns in expr
, but it doesn't help.
CodePudding user response:
For the sake of curiosity you may take a look at this answer. There's a streamlined approach to a problem very similar to yours. Only dropping some columns could be added to exactly match your case.
Your function works! But you must know that you cannot stack
together columns containing different data types. We can see, now you are trying to stack together a column of type double and a column of type date. Spark doesn't know which data type should be assigned to the resultant column, so you get this error.