I have a list like:
A 2022-08-13
B 2022-08-14
B 2022-08-13
A 2022-05-04
B 2022-05-04
C 2022-08-14
...
and I applied the following map functions to map each row with the # of occurrences:
map(lambda x: ((x.split(',')[0], x.split(',')[1]), 1))
To get this:
[
(('A', '2022-08-13'), 1),
(('B', '2022-08-14'), 1),
(('B', '2022-08-13'), 1),
(('A', '2022-05-04'), 1),
(('B', '2022-05-04'), 1),
(('C', '2022-08-14'), 1),
...
]
My end goal is to find the number of occurrences where two persons (denoted by the letter) have the same dates, to output something like this for the example above:
[
('A', 'B', 2),
('B', 'C', 1),
...
]
This is my code so far, but the reduceByKey
is not working as expected:
shifts_mapped = worker_shifts.map(lambda x: (x.split(',')[1], 1))
shifts_mapped = worker_shifts.map(lambda x: ((x.split(',')[0], x.split(',')[1]), 1))
count = shifts_mapped.reduceByKey(lambda x, y: x[0][1] y[0][1])
CodePudding user response:
Group by multiple times, first by "person", "date" and then by "date", "count" and collect persons with same date and count.
Then generate pair combinations, explode, and separate pair.
I extended your sample dataset to include persons "D" & "E" same as "A" & "B" to generate more combinations.
df = spark.createDataFrame(data=[["A","2022-08-13"],["E","2022-08-13"],["D","2022-08-13"],["B","2022-08-14"],["B","2022-08-13"],["D","2022-05-04"],["E","2022-05-04"],["A","2022-05-04"],["B","2022-05-04"],["C","2022-08-14"]], schema=["person", "date"])
df = df.groupBy("person", "date").count()
df = df.groupBy("date", "count") \
.agg(F.collect_list("person").alias("persons"))
@F.udf(returnType="array<struct<col1:string, col2:string>>")
def combinations(arr):
import itertools
return list(itertools.combinations(sorted(arr), 2))
df = df.withColumn("persons", combinations("persons"))
df = df.withColumn("persons", F.explode("persons"))
df = df.withColumn("person_1", F.col("persons").getField("col1")) \
.withColumn("person_2", F.col("persons").getField("col2"))
df = df.groupBy("person_1", "person_2").count()
Output:
-------- -------- -----
|person_1|person_2|count|
-------- -------- -----
|B |C |1 |
|D |E |2 |
|A |E |2 |
|A |D |2 |
|B |D |2 |
|A |B |2 |
|B |E |2 |
-------- -------- -----
CodePudding user response:
Here is another attempt using RDD and canonical APIs as requested in the edit. The logic is documented in the comments before each transformation.
# create sample dataframe
data = [["A","2022-08-13"],["E","2022-08-13"],["D","2022-08-13"],["B","2022-08-14"],["B","2022-08-13"],["D","2022-05-04"],["E","2022-05-04"],["A","2022-05-04"],["B","2022-05-04"],["C","2022-08-14"]]
rdd = spark.sparkContext.parallelize(data)
# group by ("person", "date") and count
rdd = rdd.map(lambda x: ((x[0], x[1]), 1)).groupByKey().mapValues(len).map(lambda x: (x[0][0], x[0][1], x[1]))
# group by ("date", "count") and collect "person" as list
rdd = rdd.map(lambda x: ((x[1], x[2]), x[0])).groupByKey().mapValues(list).map(lambda x: (x[0][0], x[0][1], x[1]))
# create pair combinations
import itertools
rdd = rdd.map(lambda x: ((x[0], x[1]), list(itertools.combinations(sorted(x[2]), 2)))).flatMapValues(lambda x: x)
# group by pair, count, and split pair to individual person columns
rdd = rdd.map(lambda x: ((x[1][0], x[1][1]), 1)).groupByKey().mapValues(len).map(lambda x: (x[0][0], x[0][1], x[1]))
print(rdd.collect())
Output:
[
('A', 'B', 2),
('A', 'D', 2),
('A', 'E', 2),
('B', 'D', 2),
('B', 'E', 2),
('D', 'E', 2),
('B', 'C', 1)
]