I have a dataframe like:
data = [('valorant','web', 'start'),
('counter-strike','android', 'start'),
('sims','web', 'finished'),
]
columns = ["game","platform", "type"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
-------------- -------- --------
| game|platform| type|
-------------- -------- --------
| valorant| web| start|
|counter-strike| android| start|
| sims| web|finished|
-------------- -------- --------
Which I want to turn into:
-------------- -----
| game|count|
-------------- -----
| valorant| 1|
|counter-strike| 1|
| sims| 1|
| sims| 1|
-------------- -----
So that if type == 'finished'
the new RDD should have 2 rows with value 1 instead of just one row with value 1.
Is there any way I can do this without having to map dataframe 2 times and then merge those RDDs?
If I do:
def func1(x):
if x.type == "start":
return (x.game, 1)
elif x.type == "finished":
return ((x.game, 1), (x.game, 1))
rdd2=df.rdd.map(lambda x: func1(x))
df2=rdd2.toDF(['game', 'value'])
df2.show(truncate=False)
--------------------------- -----
|game |value|
--------------------------- -----
|valorant |1 |
|counter |1 |
|[Ljava.lang.Object;@4b01785|null |
--------------------------- -----
It does not work obviously since func1 expects one value in return. Any ideas?
CodePudding user response:
when
expression explode
literal array:
from pyspark.sql import functions as F
df1 = df.withColumn(
"count",
F.explode(
F.when(F.col("type") == "start", F.array(F.lit(1)))
.when(F.col("type") == "finished", F.array(F.lit(1), F.lit(1)))
)
).drop("platform", "type")
df1.show()
# -------------- -----
#| game|count|
# -------------- -----
#| valorant| 1|
#|counter-strike| 1|
#| sims| 1|
#| sims| 1|
# -------------- -----