I have a dataframe using the code
df = sc.parallelize([
(123, 2345,25,""), (123, 2345,29,"NY"), (123,5422,67,"NY"),(123,9422,67,"NY"),(123,3581,98,"NY"),(231, 4322,77,""),(231,4322,99,"Paris"),(231,8342,45,"Paris")
]).toDF(["userid", "transactiontime","zip","location"])
------ --------------- --- --------
|userid|transactiontime|zip|location|
------ --------------- --- --------
| 123| 2345| 25| |
| 123| 2345| 29| NY|
| 123| 5422| 67| NY|
| 123| 9422| 67| NY|
| 123| 3581| 98| NY|
| 231| 4322| 77| |
| 231| 4322| 99| Paris|
| 231| 8342| 45| Paris|
------ --------------- --- --------
I want the output to be like this
------ --------------- --- --------
|userid|transactiontime|zip|location|
------ --------------- --- --------
| 123| 2345| 25| NY|
| 123| 2345| 29| NY|
| 123| 5422| 67| NY|
| 123| 9422| 67| NY|
| 123| 3581| 98| NY|
| 231| 4322| 77| Paris|
| 231| 4322| 99| Paris|
| 231| 8342| 45| Paris|
------ --------------- --- --------
I want to join on userid and transactiontime and fill the city column with the non-null values.
I have tried window function like this
w1 = Window.partitionBy('userid', 'transactiontime').orderBy(col('zip'))
df_new = df.withColumn("newlocation", F.last('location').over(w1))
print(df_new.show())
But this didn't work and i've tried self join as well but couldn't work that as well. Any help ??
CodePudding user response:
The first
and last
windowing functions accept an optional ignorenulls
parameter which may be helpful in this case.
However in your example you actually don't have null values but empty strings, which is different.
w = Window.partitionBy('userid', 'transactiontime')
df_new = df \
.withColumn("fixedLoc", F.when(F.col("location") == "", None).otherwise(F.col("location"))) \
.withColumn("newLoc", F.first('fixedLoc', ignorenulls=True).over(w))
In the above solution, a temporary column is used to replace empty strings with null values, then first
with ignorenulls
is used on the new column.
As an alternative solution you can use the max
function that will ignore null values and will prioritize non-empty strings:
df_new = df \
.withColumn("newLoc", F.max('location').over(w))