I am collecting data using the Spark RDD API and have created a paired RDD, as shown below:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()
sc = spark.sparkContext
raw_rdd = sc.textFile("data.csv")
paired_rdd = raw_rdd\
.map(lambda x: x.split(","))\
.map(lambda x: (x[2], [x[1], x[3],x[5]]))
Here is a sample excerpt of the paired RDD:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', '', '']),
('VXIO456XLBB630221', ['R', '', '']),
('VXIO456XLBB630221', ['R', '', ''])]
As you notice, the keys in this paired RDD are the same for all elements, but only one element has all the fields completed.
What do we want to accomplish? We want to replace the empty fields with the values of the element with complete fields. So we would have an expected output like this:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])]
I know the first step would be to do a groupByKey
, i.e.,
paired_rdd.groupByKey().map(lambda kv: ____)
I am just not sure how to iterate through the values and how this would fit into one lambda function.
CodePudding user response:
The best way would probably to go with dataframes and window functions. With RDDs, you could work something out as well with an aggregation (reduceByKey
) that would fill in the blanks and keep in memory the list of first elements of the list. Then we could re flatten based on that memory to create the same number of rows as before but with the values filled in.
# let's define a function that selects the none empty values between two strings
def select_value(a, b):
if a is None or len(a) == 0:
return b
else:
return a
# let's use mapValues to separate the first element of the list and the rest
# Then we use reduceByKey to aggregate the list of all first elements (first
# element of the tuple). For the other elements, we only keep non empty values
# (second element of the tuple).
# Finally, we use flatMapValues to recreate the rows based on the memorized
# first elements of the lists.
paired_rdd\
.mapValues(lambda x: ([x[0]], x[1:]))\
.reduceByKey(lambda a, b: (
a[0] b[0],
[select_value(a[1][i], b[1][i]) for i in range(len(a[1])) ]
) )\
.flatMapValues(lambda x: [[k] x[1] for k in x[0]])\
.collect()
Which yields:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])
]