Home > Software engineering >  Deduplicate stream records
Deduplicate stream records

Time:07-29

I'm using Redis to stream data. I have multiple producer instances producing the same data, aiming event consistency.

Right now the producers generate trades with random trade ids between 1 and 2. I want a deduplication service or something which based on trade id to distrinct the duplicates. How do I do that?

Consumer

using System.Text.Json;
using Shared;
using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();

const string streamName = "positions";
const string groupName = "avg";

if (!await db.KeyExistsAsync(streamName) ||
    (await db.StreamGroupInfoAsync(streamName)).All(x => x.Name != groupName))
{
    await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0");
}

var consumerGroupReadTask = Task.Run(async () =>
{
    var id = string.Empty;
    while (!token.IsCancellationRequested)
    {
        if (!string.IsNullOrEmpty(id))
        {
            await db.StreamAcknowledgeAsync(streamName, groupName, id);
            id = string.Empty;
        }

        var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
        if (result.Any())
        {
            id = result.First().Id;
            var dict = ParseResult(result.First());

            var trade = JsonSerializer.Deserialize<Trade>(dict["trade"]);

            Console.WriteLine($"Group read result: trade: {dict["trade"]}, time: {dict["time"]}");
        }

        await Task.Delay(1000);
    }
});

Console.ReadLine();

static Dictionary<string, string> ParseResult(StreamEntry entry)
{
    return entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
}

Producer

using System.Text.Json;
using Shared;
using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();

const string streamName = "positions";

var producerTask = Task.Run(async () =>
{
    var random = new Random();
    while (!token.IsCancellationRequested)
    {
        var trade = new Trade(random.Next(1, 3), "btcusdt", 25000, 2);

        var entry = new List<NameValueEntry>
        {
            new("trade", JsonSerializer.Serialize(trade)),
            new("time", DateTimeOffset.Now.ToUnixTimeSeconds())
        };

        await db.StreamAddAsync(streamName, entry.ToArray());

        await Task.Delay(2000);
    }
});

Console.ReadLine();

CodePudding user response:

You can use a couple tactics here, depending on the level of distribution required and the degree to which you can handle missing messages incoming from your stream. Here are a couple workable solutions using Redis:

Use Bloom Filters When you can tolerate a 1% miss in events

You can use a BloomFilter in Redis, which will be a very compact, very fast way to determine if a particular record has not been recorded yet. If you run:

var hasBeenAdded = ((int)await db.ExecuteAsync("BF.ADD", "bf:trades",dict["trade"])) == 1;

If hasBeenAdded is true, you can definitively say that the record is not a duplicate, if it is false, there's about a probability depending on how you set up the bloom filter with BF.RESERVE

If you want to use a Bloom Filter, you'll need to either side-load RedisBloom into your instance of Redis, or you can just use Redis Stack

Use a Sorted Set when misses aren't acceptable

If your app cannot tolerate a miss, you are probably wiser to use a Set or a Sorted Set, in general I'd advise you to use a set because they are much easier to clean up.

Basically if you are using a sorted set, you would check to see if a record has already been recorded in your average by using a ZSCORE zset:trades trade-id, if a score comes back you know that the records been used already, otherwise you can add it to the sorted set. Importantly, because your sorted set grows linearly you are going to want to clean it up periodically, so if you set the timestamp from the message id to the score, you likely can determine some workable interval to go back in and do a ZREMRANGEBYSCORE to clear out older records.

  • Related