I have a table with 2 columns: an ID (uuid) and a value (int). I would like to add a 3rd column that is a group. I want to divide the rows into 3 groups that are equally sized by unique id values.
For example, let's say I have 99 unique ids with a total of 200 rows. After assigning groups, there may end up being one group assigned to 33 rows, another assigned to 100, and the third assigned to 67. But, all 3 groups should have an equal number (33) unique ids.
Example dataset:
id -> assigned group
---------------------
abc -> group a
def -> group b
ghi -> group c
jkl -> group a
mno -> group b
pqr -> group c
...
Original Table: Updated table:
id(uuid) | val id(uuid) | val | group
----------- --------- --> --------- ----- ---------
abc | 1 abc | 1 | a
pqr | 1 pqr | 1 | c
abc | 2 abc | 3 | a
mno | 5 mno | 5 | b
def | 1 def | 1 | b
mno | 3 mno | 3 | b
def | 4 def | 4 | b
pqr | 3 pqr | 3 | c
ghi | 5 ghi | 5 | c
jkl | 1 jkl | 1 | a
mno | 4 mno | 4 | b
jkl | 6 jkl | 6 | a
def | 3 def | 3 | b
mno | 2 mno | 2 | b
...
Rows: 14
Num buckets: 3 [a, b, c]
Bucket a --> ids: 2, rows: 4
Bucket b --> ids: 2, rows: 7
Bucket c --> ids: 2, rows: 3
CodePudding user response:
You could do that in two steps.
- First, extract all the unique ids and associate each of them to an index K ranging from 1 to the number of unique ids (or 0 to that number minus 1). Then, the assigned group is that index modulo 3.
- You join that result to the original dataframe and you have your result.
# Step 1
groups = df\
.select("id(uuid)")
.distinct()
.rdd.map(lambda x: x[0])
.zipWithIndex()
.mapValues(lambda x : x % 3)
.toDF(["id(uuid)", "group"])
groups.show()
Which yields:
-------- -----
|id(uuid)|group|
-------- -----
| pqr| 0|
| jkl| 1|
| ghi| 2|
| mno| 0|
| abc| 1|
| def| 2|
-------- -----
And then:
# Step 2
result = df.join(groups, 'id(uuid)')
NB, if you want different names for your groups than integers, you can simply create a group mapping like this:
group_map = [ (0, 'a'), (1, 'b'), (2, 'c') ]
group_map_df = spark.createDataFrame(group_map, ['group', 'new_group'])
new_result = result.join(group_map_df, ['group'])