I'm using Redis to stream data. I have multiple producer instances producing the same data, aiming event consistency.
Right now the producers generate trades with random trade ids between 1 and 2. I want a deduplication service or something which based on trade id to distrinct the duplicates. How do I do that?
Consumer
using System.Text.Json;
using Shared;
using StackExchange.Redis;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();
const string streamName = "positions";
const string groupName = "avg";
if (!await db.KeyExistsAsync(streamName) ||
(await db.StreamGroupInfoAsync(streamName)).All(x => x.Name != groupName))
{
await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0");
}
var consumerGroupReadTask = Task.Run(async () =>
{
var id = string.Empty;
while (!token.IsCancellationRequested)
{
if (!string.IsNullOrEmpty(id))
{
await db.StreamAcknowledgeAsync(streamName, groupName, id);
id = string.Empty;
}
var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
if (result.Any())
{
id = result.First().Id;
var dict = ParseResult(result.First());
var trade = JsonSerializer.Deserialize<Trade>(dict["trade"]);
Console.WriteLine($"Group read result: trade: {dict["trade"]}, time: {dict["time"]}");
}
await Task.Delay(1000);
}
});
Console.ReadLine();
static Dictionary<string, string> ParseResult(StreamEntry entry)
{
return entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
}
Producer
using System.Text.Json;
using Shared;
using StackExchange.Redis;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();
const string streamName = "positions";
var producerTask = Task.Run(async () =>
{
var random = new Random();
while (!token.IsCancellationRequested)
{
var trade = new Trade(random.Next(1, 3), "btcusdt", 25000, 2);
var entry = new List<NameValueEntry>
{
new("trade", JsonSerializer.Serialize(trade)),
new("time", DateTimeOffset.Now.ToUnixTimeSeconds())
};
await db.StreamAddAsync(streamName, entry.ToArray());
await Task.Delay(2000);
}
});
Console.ReadLine();
CodePudding user response:
You can use a couple tactics here, depending on the level of distribution required and the degree to which you can handle missing messages incoming from your stream. Here are a couple workable solutions using Redis:
Use Bloom Filters When you can tolerate a 1% miss in events
You can use a BloomFilter in Redis, which will be a very compact, very fast way to determine if a particular record has not been recorded yet. If you run:
var hasBeenAdded = ((int)await db.ExecuteAsync("BF.ADD", "bf:trades",dict["trade"])) == 1;
If hasBeenAdded
is true, you can definitively say that the record is not a duplicate, if it is false, there's about a probability depending on how you set up the bloom filter with BF.RESERVE
If you want to use a Bloom Filter, you'll need to either side-load RedisBloom into your instance of Redis, or you can just use Redis Stack
Use a Sorted Set when misses aren't acceptable
If your app cannot tolerate a miss, you are probably wiser to use a Set or a Sorted Set, in general I'd advise you to use a set because they are much easier to clean up.
Basically if you are using a sorted set, you would check to see if a record has already been recorded in your average by using a ZSCORE zset:trades trade-id
, if a score comes back you know that the records been used already, otherwise you can add it to the sorted set. Importantly, because your sorted set grows linearly you are going to want to clean it up periodically, so if you set the timestamp from the message id to the score, you likely can determine some workable interval to go back in and do a ZREMRANGEBYSCORE
to clear out older records.