Assume a kafka system with the following two topics:
- created
- deleted
They are used to advertise the creation and deletion of items.
The structure of the events in kafka is JSON and the same for both topics:
{
"id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
"timestamp": "2022-01-05T07:31:04.913000"
}
Now how it it possible with spark (scala) to accumulate the the deleted and created amounts such that we get a number of current items by timestamp.
Assume the following events in kafka
topic: created
{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}
topic: deleted
{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}
So this basically means:
- 2022-01-01: 1 item got created, total count of items is 1
- 2022-01-02: 1 item got created, total count of items is 2
- 2022-01-03: 1 item got deleted, total count of items is 1
- 2022-01-04: 1 item got deleted, total count of items is 0
The resulting output of the program should be the count of items per timestamp, as for example here:
----------------------------------------
| timestamp | count |
----------------------------------------
| 2022-01-01T00:00:00.000000 | 1 |
| 2022-01-02T00:00:00.000000 | 2 |
| 2022-01-03T00:00:00.000000 | 1 |
| 2022-01-04T00:00:00.000000 | 1 |
----------------------------------------
How can two topics be merged and the result ordered by timestamp?
CodePudding user response:
You can read from both topics with .option("subscribe", "created,deleted")
Then you have a dataframe of both topics, and you can then parse the value for the timestamp, and should be able to sort by the timestamp, and aggregate/reduce the dataframe to get the output.
Alternatively, Kafka records already have a timestamp within them, which Spark returns as a column. So, you can change your producer design to have one topic, say events
, remove the timestamp from the value, move the id
to the record key
, create events will have a non-null value
, and delete events will have a null
value
.
Either way, you still need a reducer function.
Or you can not use Spark, and with the proposed topic design, then creating a table in Kafka Streams / ksqlDB will already have the data you want. Not necessarily with timestamp information, but at least aggregate counts by ID or other value.