I have data for processing in PySpark SQL that looks like this:
--------- ----------------
|user_id |user_ids |
--------- ----------------
|null |[479534, 1234] |
|null |[1234] |
|null |[479535] |
|null |[479535, 479536]|
|null |[1234] |
|null |[479535] |
|1234567 |null |
|1234567 |null |
|777 |null |
|888 |null |
|null |null |
--------- ----------------
I need just a single user_id
column, with additional rows exploded from user_ids
, so something like this:
---------
|user_id |
---------
|479534 |
|1234 |
|1234 |
|479535 |
|479535 |
|479536 |
|1234 |
|479535 |
|1234567 |
|1234567 |
|777 |
|888 |
|null |
---------
How can I achieve that?
I've tried:
.withColumn("user_ids", F.explode_outer("user_ids"))
.withColumn("user_id", F.coalesce(df["user_id"], df["user_ids"]))
But with that I get:
cannot resolve 'coalesce(user_id, user_ids)' due to data type mismatch: input to function coalesce should all be the same type, but it's [bigint, array<bigint>];
So I think that withColumn
cannot use another created column in this case.
CodePudding user response:
You are not saving the dataframe after the explode, so do not refer the columns as df['col']
but just call F.col('col')
. For example,
df.withColumn('user_ids', F.explode_outer('user_ids'))
.withColumn('user_id', F.coalesce(F.col('user_id'), F.col('user_ids')))
Here is my trial.
from pyspark.sql import functions as f
df = spark.createDataFrame([[None, [479534, 1234]], [1234567, None]], ['user_id', 'user_ids'])
df.show()
------- --------------
|user_id| user_ids|
------- --------------
| null|[479534, 1234]|
|1234567| null|
------- --------------
df.withColumn('user_ids', f.explode_outer('user_ids')) \
.withColumn('user_id', f.coalesce(f.col('user_id'), f.col('user_ids'))) \
.drop('user_ids') \
.show()
-------
|user_id|
-------
| 479534|
| 1234|
|1234567|
-------