Home > OS >  transpose the output of a sql output using pyspark
transpose the output of a sql output using pyspark

Time:12-03

I have a sparksql select query as below

select max(age),min(age),avg(age),max(sal),min(sal),avg(sal) from Emp;

Output dataframe is getting created as below:

max(age) min(age) avg(age) max(sal) min(sal) avg(sal)
46 23 31 10000 2000 5000

My requirement is the dataframe should be as below using pyspark using transpose.

columns max min avg
age 46 23 31
sal 10000 2000 5000

Thanks for the help in advance.

CodePudding user response:

The easiest way would be to run two queries (one for sal and one for age and union them.

select 'age' as column, max(age) as max, min(age) as min, avg(age) as avg from Emp;
select 'sal' as column, max(sal) as max, min(sal) as min, avg(sal) as avg from Emp;

Load those into two dataframes df_sal and df_age and union them:

final = df_sal.union(df_age)

Update: In case only a single query can be done (as commented by the OP). In this case the stack method can help you.

df = spark.createDataFrame([
    Row(avg_sal=1, max_sal=1, min_sal=1, avg_age=1, max_age=1, min_age=1)
])

df.show()
 ------- ------- ------- ------- ------- ------- 
|avg_sal|max_sal|min_sal|avg_age|max_age|min_age|
 ------- ------- ------- ------- ------- ------- 
|      1|      2|      3|      4|      5|      6|
 ------- ------- ------- ------- ------- ------- 

(
    df
    .select(F.expr("stack(2, 'sal', avg_sal, max_sal, min_sal, 'age', avg_age, max_age, min_age) as (column, avg, max, min)"))
    .show()
)
 ------ --- --- --- 
|column|avg|max|min|
 ------ --- --- --- 
|   sal|  1|  2|  3|
|   age|  4|  5|  6|
 ------ --- --- --- 

In the example I renamed the input columns to avoid problems with brackets in column names. This can be done directly in the SQL query.

CodePudding user response:

If your original dataframe df look like below,

 ----- ------ 
|  age|  sals|
 ----- ------ 
|   46|  2000|
|   23| 10000|
|  ...|   ...|
 ----- ------ 

then

org_cols = df.columns
# ['age', 'sal']

sql = spark.sql('select max(age),min(age),avg(age),max(sal),min(sal),avg(sal) from Emp;')

cur_cols = sql.columns
df2 = sql.rdd.flatMap(lambda row: [[org_cols [i], row[3*i], row[3*i   1], row[3*i   2]] for i in range(0, int(len(row) / 3))]).toDF(['columns', 'max', 'min', 'avg'])
df2.show()

 ------- ----- ---- ---- 
|columns|  max| min| avg|
 ------- ----- ---- ---- 
|    age|   46|  23|  31|
|    sal|10000|2000|5000|
 ------- ----- ---- ---- 
  • Related