I want the equivalent of this pandas code in pyspark. The following pandas code generates the atable names and the indexes where the atable name is found:
import pandas as pd
df1 = pd.DataFrame({
'atable': ['Users', 'Users', 'Domains', 'Domains', 'Locks'],
'column': ['col_1', 'col_2', 'col_a', 'col_b', 'col'],
'column_type':['varchar', 'varchar', 'int', 'varchar', 'varchar'],
'is_null': ['No', 'No', 'Yes', 'No', 'Yes'],
})
df1_grouped = df1.groupby('atable')
# iterate over each group
for group_name, df_group in df1_grouped.groups.items():
print(group_name, df_group)
Output:
Domains Int64Index([2, 3], dtype='int64')
Locks Int64Index([4], dtype='int64')
Users Int64Index([0, 1], dtype='int64')
The spark output should look like this:
# ------- --------------------
#|atable | sources|
# ------- --------------------
#|Domains| [2, 3] |
#|Locks | [4] |
#| Users | [0, 1] |
# ------- --------------------
CodePudding user response:
As already pointed out by samkart there is no intrinsic order in a Spark dataframe. If you want to retain the information which row in the original dataframe went into which group during the grouping operation, you can use monotonically_increasing_id to assign a unique id to each row in the original dataframe and then use collect_list as aggregation function:
from pyspark.sql import functions as F
df=spark.createDataFrame(df1)
df.withColumn("id", F.monotonically_increasing_id()) \
.groupBy('atable') \
.agg(F.collect_list('id')) \
.show(truncate=False)
Output:
------- --------------------------
|atable |collect_list(id) |
------- --------------------------
|Domains|[17179869184, 25769803776]|
|Users |[0, 8589934592] |
|Locks |[25769803777] |
------- --------------------------
The IDs created by monotonically_increasing_id
are unique but not sequential. There are some hacks to create sequential ids (here or here) but usually these approaches are not a good idea as a unique sequential id does not fit well into Spark's distributed nature.