I am need to group row based value against each index from below data frame
----- ------ ------ ------ ------ ----- ---- -------
|index|amount| dept | date | amount |dept |date |
----- ----------- ----- -- --------- --------- ----
| 1|1000 | acnt |2-4-21| 2000 | acnt2 |2-4-21 |
| 2|1500 | sales|2-3-21| 1600 | sales2|2-3-21 |
since index stand unique to each row and date are same , i need to group the row values as below
----- ------ ------------ -------
|index|amount | dept | date |
----- --------- ------------ -------
| 1|1000,2000|acnt,acnt2 |2-4-21 |
| 2|1500,1600|sales,sales2|2-3-21 |
i see many option to group columns but specifically for row based value in pyspark Is there any solution to populate the result as above?
CodePudding user response:
Ideally this needs to be fixed upstream (check if you have joins in your upstream codes and try to select only appropriate aliases to retain the unique columns only).
With that being said, you can create a helper spark function after creating a helper dictionary and column names:
from pyspark.sql import functions as F
from itertools import groupby
Create a fresh list with a counter:
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0) 1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']
Then with this new list create a dataframe with the existing dataframe and use a helper function to concat based on duplicate checks:
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
--------- ------ ------------ -----
| amount| date| dept|index|
--------- ------ ------------ -----
|1000,2000|2-4-21| acnt,acnt2| 1|
|1500,1600|2-3-21|sales,sales2| 2|
--------- ------ ------------ -----
Full Code:
from pyspark.sql import functions as F
from itertools import groupby
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0) 1
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
CodePudding user response:
let's say you have an initial data frame as shown below
INPUT: ------ ------ ------ ------
| dept| dept|amount|amount|
------ ------ ------ ------
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
------ ------ ------ ------
- Rename the columns:
newColumns = ["dept1","dept2","amount1","amount2"]
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()
------ ------ ------- -------
| dept1| dept2|amount1|amount2|
------ ------ ------- -------
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
------ ------ ------- -------
- Derive the final output columns:
final_df = None
final_df = new_clms_df.\
withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()
------ ------ ------- ------- ------------- ------
| dept1| dept2|amount1|amount2| dept|amount|
------ ------ ------- ------- ------------- ------
|sales1|sales2| 1| 1|sales1,sales2| 1,1|
|sales1|sales2| 2| 2|sales1,sales2| 2,2|
|sales1|sales2| 3| 3|sales1,sales2| 3,3|
|sales1|sales2| 4| 4|sales1,sales2| 4,4|
|sales1|sales2| 5| 5|sales1,sales2| 5,5|
------ ------ ------- ------- ------------- ------
CodePudding user response:
There are two ways.. deppending on what you want
from pyspark.sql.functions import struct, array, col
df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array
if there are two columns with same name (like in your example), just recreate your df
(If is a join, there is no need... Just use alias)
cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)