Home > database >  Spark: How can I split a column's unique values into equal size buckets and map them back to a
Spark: How can I split a column's unique values into equal size buckets and map them back to a

Time:04-27

I have a table with 2 columns: an ID (uuid) and a value (int). I would like to add a 3rd column that is a group. I want to divide the rows into 3 groups that are equally sized by unique id values.

For example, let's say I have 99 unique ids with a total of 200 rows. After assigning groups, there may end up being one group assigned to 33 rows, another assigned to 100, and the third assigned to 67. But, all 3 groups should have an equal number (33) unique ids.

Example dataset:

id  -> assigned group
---------------------
abc -> group a
def -> group b
ghi -> group c
jkl -> group a
mno -> group b
pqr -> group c
...

Original Table:                   Updated table:

id(uuid)   | val                  id(uuid) | val | group
----------- ---------     -->     --------- ----- ---------
abc        | 1                    abc      | 1   | a
pqr        | 1                    pqr      | 1   | c
abc        | 2                    abc      | 3   | a
mno        | 5                    mno      | 5   | b
def        | 1                    def      | 1   | b
mno        | 3                    mno      | 3   | b
def        | 4                    def      | 4   | b
pqr        | 3                    pqr      | 3   | c
ghi        | 5                    ghi      | 5   | c
jkl        | 1                    jkl      | 1   | a
mno        | 4                    mno      | 4   | b
jkl        | 6                    jkl      | 6   | a
def        | 3                    def      | 3   | b
mno        | 2                    mno      | 2   | b
...

Rows: 14
Num buckets: 3 [a, b, c]
Bucket a --> ids: 2, rows: 4
Bucket b --> ids: 2, rows: 7
Bucket c --> ids: 2, rows: 3

CodePudding user response:

You could do that in two steps.

  1. First, extract all the unique ids and associate each of them to an index K ranging from 1 to the number of unique ids (or 0 to that number minus 1). Then, the assigned group is that index modulo 3.
  2. You join that result to the original dataframe and you have your result.
# Step 1
groups = df\
    .select("id(uuid)")
    .distinct()
    .rdd.map(lambda x: x[0])
    .zipWithIndex()
    .mapValues(lambda x : x % 3)
    .toDF(["id(uuid)", "group"])
groups.show()

Which yields:

 -------- ----- 
|id(uuid)|group|
 -------- ----- 
|     pqr|    0|
|     jkl|    1|
|     ghi|    2|
|     mno|    0|
|     abc|    1|
|     def|    2|
 -------- ----- 

And then:

# Step 2
result = df.join(groups, 'id(uuid)')

NB, if you want different names for your groups than integers, you can simply create a group mapping like this:

group_map = [ (0, 'a'), (1, 'b'), (2, 'c') ]
group_map_df = spark.createDataFrame(group_map, ['group', 'new_group'])
new_result = result.join(group_map_df, ['group'])
  • Related