I want to be able to read data from a kafka topic, group it by a column and aggregate/reduce the sum of a given column. If the timestamp from message a is greater than the timestamp from Message b -> i want to get the timestamp from a else the timestamp from B.
So if i group my DF based on a given year i want to get the max timestamp of a message to that given year and the sum of all sales to that year.
This is what i tried:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
games_schema="""
year string,
global_sales double,
name string,
platform string,
time_send string
"""
castDF = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS String) as timestamp") \
.select(from_json(col("value"), games_schema).alias("data"), col('timestamp')) \
.select(col("data.year"),
col("data.global_sales").cast('double').alias('global_sales'),
col("data.name"),
col("data.platform"),
col("data.time_send").cast('string').alias('time_send'),
col('timestamp')
)
def max_ts(a,b):
if a[5] > b[5]:
return (a[0], a[1] b[1], a[2], a[3], a[4], a[5])
else:
return (b[0], b[1] a[1], b[2], b[3], b[4], b[5])
castDF.groupby('year').reduce(max_ts)
I get an AttributeError: 'GroupedData' object has no attribute 'reduce'
I even tried to accomplish it with a join:
max_ts_df = castDF.groupby('year').agg(max('timestamp'))
sum_df = castDF.groupby('year').agg(sum('global_sales'))
joined_df = max_ts_df.join(sum_df, max_ts_df.year == sum_df.year)
But then i get another error: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;
How can i group a DataFrame by a column and aggregate/reduce a column based on a condition ?
CodePudding user response:
.reduce() is a low level API that can only be called on RDDs.
To prevent the Multiple streaming aggregations
error, you need to use the .forEachBatch() function.
An example:
def foreach_batch_function(data_frame, epoch_id):
games_schema = """
year string,
global_sales double,
name string,
platform string,
time_send string
"""
castDF = data_frame.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS String) as timestamp") \
.select(from_json(col("value"), games_schema).alias("data"), col('timestamp')) \
.select(col("data.year"),
col("data.global_sales").cast('double').alias('global_sales'),
col("data.name"),
col("data.platform"),
col("data.time_send").cast('string').alias('time_send'),
col('timestamp'))
max_ts_df = castDF.groupby('year').agg(max('timestamp'))
sum_df = castDF.groupby('year').agg(sum('global_sales'))
joined_df = max_ts_df.join(sum_df, max_ts_df.year == sum_df.year)
joined_df.write.mode("complete").parquet('/temp')
(
spark
.readStream
.format('kafka')
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
.writeStream
.trigger(processingTime='x seconds')
.foreachBatch(foreach_batch_function)
.start()
)