Home > other >  Spark aggregate events from two different kafka topics orderd by timestamp
Spark aggregate events from two different kafka topics orderd by timestamp

Time:06-28

Assume a kafka system with the following two topics:

  • created
  • deleted

They are used to advertise the creation and deletion of items.

The structure of the events in kafka is JSON and the same for both topics:

{
  "id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
  "timestamp": "2022-01-05T07:31:04.913000"
}

Now how it it possible with spark (scala) to accumulate the the deleted and created amounts such that we get a number of current items by timestamp.

Assume the following events in kafka

topic: created

{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}

topic: deleted

{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}

So this basically means:

  • 2022-01-01: 1 item got created, total count of items is 1
  • 2022-01-02: 1 item got created, total count of items is 2
  • 2022-01-03: 1 item got deleted, total count of items is 1
  • 2022-01-04: 1 item got deleted, total count of items is 0

The resulting output of the program should be the count of items per timestamp, as for example here:

----------------------------------------
| timestamp                    | count |
----------------------------------------
| 2022-01-01T00:00:00.000000   | 1     |
| 2022-01-02T00:00:00.000000   | 2     |
| 2022-01-03T00:00:00.000000   | 1     |
| 2022-01-04T00:00:00.000000   | 1     |
----------------------------------------

How can two topics be merged and the result ordered by timestamp?

CodePudding user response:

You can read from both topics with .option("subscribe", "created,deleted")

Then you have a dataframe of both topics, and you can then parse the value for the timestamp, and should be able to sort by the timestamp, and aggregate/reduce the dataframe to get the output.

Alternatively, Kafka records already have a timestamp within them, which Spark returns as a column. So, you can change your producer design to have one topic, say events, remove the timestamp from the value, move the id to the record key, create events will have a non-null value, and delete events will have a null value.

Either way, you still need a reducer function.


Or you can not use Spark, and with the proposed topic design, then creating a table in Kafka Streams / ksqlDB will already have the data you want. Not necessarily with timestamp information, but at least aggregate counts by ID or other value.

  • Related