Home > OS >  Pyspark Structured Streaming GroupBy and Reduce with condition
Pyspark Structured Streaming GroupBy and Reduce with condition

Time:12-13

I want to be able to read data from a kafka topic, group it by a column and aggregate/reduce the sum of a given column. If the timestamp from message a is greater than the timestamp from Message b -> i want to get the timestamp from a else the timestamp from B.

So if i group my DF based on a given year i want to get the max timestamp of a message to that given year and the sum of all sales to that year.

This is what i tried:

df = spark.readStream \
               .format("kafka") \
               .option("kafka.bootstrap.servers", "kafka:9092") \
               .option("subscribe", "test") \
               .option("startingOffsets", "earliest") \
               .load()



games_schema="""
    year string,
    global_sales double,
    name string,
    platform string,
    time_send string
"""
castDF = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS String) as timestamp") \
                  .select(from_json(col("value"), games_schema).alias("data"), col('timestamp')) \
                  .select(col("data.year"), 
                          col("data.global_sales").cast('double').alias('global_sales'),
                          col("data.name"),
                          col("data.platform"),
                          col("data.time_send").cast('string').alias('time_send'),
                          col('timestamp')
                          
                         )

def max_ts(a,b):
    if a[5] > b[5]:
        return (a[0], a[1]   b[1], a[2], a[3], a[4], a[5])
    else:
        return (b[0], b[1]   a[1], b[2], b[3], b[4], b[5])

castDF.groupby('year').reduce(max_ts)

I get an AttributeError: 'GroupedData' object has no attribute 'reduce' I even tried to accomplish it with a join:

max_ts_df = castDF.groupby('year').agg(max('timestamp'))
sum_df = castDF.groupby('year').agg(sum('global_sales'))
joined_df = max_ts_df.join(sum_df, max_ts_df.year == sum_df.year)

But then i get another error: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;

How can i group a DataFrame by a column and aggregate/reduce a column based on a condition ?

CodePudding user response:

.reduce() is a low level API that can only be called on RDDs.

To prevent the Multiple streaming aggregations error, you need to use the .forEachBatch() function.

An example:

def foreach_batch_function(data_frame, epoch_id):
    games_schema = """
        year string,
        global_sales double,
        name string,
        platform string,
        time_send string
    """
    castDF = data_frame.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS String) as timestamp") \
        .select(from_json(col("value"), games_schema).alias("data"), col('timestamp')) \
        .select(col("data.year"),
                col("data.global_sales").cast('double').alias('global_sales'),
                col("data.name"),
                col("data.platform"),
                col("data.time_send").cast('string').alias('time_send'),
                col('timestamp'))

    max_ts_df = castDF.groupby('year').agg(max('timestamp'))
    sum_df = castDF.groupby('year').agg(sum('global_sales'))
    joined_df = max_ts_df.join(sum_df, max_ts_df.year == sum_df.year)

    joined_df.write.mode("complete").parquet('/temp')

(
    spark
    .readStream
    .format('kafka')
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()
    .writeStream
    .trigger(processingTime='x seconds')
    .foreachBatch(foreach_batch_function)
    .start()
)
  • Related