Home > Mobile >  Pyspark MapReduce - how to get number occurrences in a list of tuple
Pyspark MapReduce - how to get number occurrences in a list of tuple

Time:12-05

I have a list like:

A 2022-08-13
B 2022-08-14
B 2022-08-13
A 2022-05-04
B 2022-05-04
C 2022-08-14
...

and I applied the following map functions to map each row with the # of occurrences:

map(lambda x: ((x.split(',')[0], x.split(',')[1]), 1))

To get this:

[
    (('A', '2022-08-13'), 1), 
    (('B', '2022-08-14'), 1), 
    (('B', '2022-08-13'), 1), 
    (('A', '2022-05-04'), 1),
    (('B', '2022-05-04'), 1),  
    (('C', '2022-08-14'), 1),
    ...
]

My end goal is to find the number of occurrences where two persons (denoted by the letter) have the same dates, to output something like this for the example above:

[
    ('A', 'B', 2),
    ('B', 'C', 1),
    ...
]

This is my code so far, but the reduceByKey is not working as expected:

shifts_mapped = worker_shifts.map(lambda x: (x.split(',')[1], 1))
shifts_mapped = worker_shifts.map(lambda x: ((x.split(',')[0], x.split(',')[1]), 1))
count = shifts_mapped.reduceByKey(lambda x, y: x[0][1]   y[0][1])

CodePudding user response:

Group by multiple times, first by "person", "date" and then by "date", "count" and collect persons with same date and count.

Then generate pair combinations, explode, and separate pair.

I extended your sample dataset to include persons "D" & "E" same as "A" & "B" to generate more combinations.

df = spark.createDataFrame(data=[["A","2022-08-13"],["E","2022-08-13"],["D","2022-08-13"],["B","2022-08-14"],["B","2022-08-13"],["D","2022-05-04"],["E","2022-05-04"],["A","2022-05-04"],["B","2022-05-04"],["C","2022-08-14"]], schema=["person", "date"])

df = df.groupBy("person", "date").count()

df = df.groupBy("date", "count") \
       .agg(F.collect_list("person").alias("persons"))

@F.udf(returnType="array<struct<col1:string, col2:string>>")
def combinations(arr): 
  import itertools
  return list(itertools.combinations(sorted(arr), 2))

df = df.withColumn("persons", combinations("persons"))

df = df.withColumn("persons", F.explode("persons"))

df = df.withColumn("person_1", F.col("persons").getField("col1")) \
       .withColumn("person_2", F.col("persons").getField("col2"))

df = df.groupBy("person_1", "person_2").count()

Output:

 -------- -------- ----- 
|person_1|person_2|count|
 -------- -------- ----- 
|B       |C       |1    |
|D       |E       |2    |
|A       |E       |2    |
|A       |D       |2    |
|B       |D       |2    |
|A       |B       |2    |
|B       |E       |2    |
 -------- -------- ----- 

CodePudding user response:

Here is another attempt using RDD and canonical APIs as requested in the edit. The logic is documented in the comments before each transformation.

# create sample dataframe
data = [["A","2022-08-13"],["E","2022-08-13"],["D","2022-08-13"],["B","2022-08-14"],["B","2022-08-13"],["D","2022-05-04"],["E","2022-05-04"],["A","2022-05-04"],["B","2022-05-04"],["C","2022-08-14"]]
rdd = spark.sparkContext.parallelize(data)

# group by ("person", "date") and count
rdd = rdd.map(lambda x: ((x[0], x[1]), 1)).groupByKey().mapValues(len).map(lambda x: (x[0][0], x[0][1], x[1]))

# group by ("date", "count") and collect "person" as list
rdd = rdd.map(lambda x: ((x[1], x[2]), x[0])).groupByKey().mapValues(list).map(lambda x: (x[0][0], x[0][1], x[1]))

# create pair combinations
import itertools
rdd = rdd.map(lambda x: ((x[0], x[1]), list(itertools.combinations(sorted(x[2]), 2)))).flatMapValues(lambda x: x)

# group by pair, count, and split pair to individual person columns
rdd = rdd.map(lambda x: ((x[1][0], x[1][1]), 1)).groupByKey().mapValues(len).map(lambda x: (x[0][0], x[0][1], x[1]))

print(rdd.collect())

Output:

[
    ('A', 'B', 2), 
    ('A', 'D', 2), 
    ('A', 'E', 2), 
    ('B', 'D', 2), 
    ('B', 'E', 2), 
    ('D', 'E', 2), 
    ('B', 'C', 1)
]
  • Related