I am new to golang and have a use case where operations on a value of a type have to run in a sequential manner where as operation on value of other type can be run concurrently.
- Imagine data is coming from a streaming connection (In-order)
key_name_1, value_1 key_name_2, value_2 key_name_1, value_1
- Now,
key_name_1
, andkey_name_2
can be operated by goroutine concurrently. - But as next streamed value (3rd row) is
key_name_1
again, so this operation should only be processed by goroutine if the earlier operation (1st row) has finished otherwise it should wait for the 1st operation to finish before it can apply the operation. For the sake of discussion we can assume that operation is simply adding the new value to previous value.
What would be the right way to achieve this in golang with highest possible performance ?
The exact use case is database changes are streamed on a queue, now if a value is getting changed its important that onto another database that operation is applied on the same sequence otherwise consistency will get impacted. Conflicts are rare, but can happen.
CodePudding user response:
As a simple solution for mutual exclusivity for a given key you can just use a locked map of ref-counted locks. It's not the most optimal for high loads, but might just suffice in your case.
type processLock struct {
mtx sync.Mutex
refcount int
}
type locker struct {
mtx sync.Mutex
locks map[string]*processLock
}
func (l *locker) acquire(key string) {
l.mtx.Lock()
lk, found := l.locks[key]
if !found {
lk = &processLock{}
l.locks[key] = lk
}
lk.refcount
l.mtx.Unlock()
lk.mtx.Lock()
}
func (l *locker) release(key string) {
l.mtx.Lock()
lk := l.locks[key]
lk.refcount--
if lk.refcount == 0 {
delete(l.locks, key)
}
l.mtx.Unlock()
lk.mtx.Unlock()
}
Just call acquire(key)
before processing a key and release(key)
when done with it.
Warning! The code above guarantees exclusivity, but not sequence. To sequentialize the unlocking you need a FIFO mutex.
CodePudding user response:
Just a suggestion about message routing with fixed size number of workers.
IDK if it is the fastest, i guess not, but it is simple and re configurable (with some effort one can make it JiT resizable).
I do believe the hashing part can be optimized and that some clever techniques exists. Otherwise you can also pile up some items in memory by increasing the channels buffers, simple again.
anyways, https://go.dev/play/p/fnxNJ9VZK8q
// You can edit this code!
// Click here and start typing.
package main
import (
"fmt"
"hash/fnv"
"sync"
)
func main() {
events := []map[string]interface{}{
map[string]interface{}{
"key1": "v",
"key2": "v",
"key3": "v",
},
map[string]interface{}{
"key1": "v",
"key2": "v",
"key3": "v",
},
}
type workerWork struct {
work interface{}
workerID int
}
workers := 4
inputs := []chan interface{}{}
output := make(chan workerWork)
var wg sync.WaitGroup
for i := 0; i < workers; i {
i := i
input := make(chan interface{})
inputs = append(inputs, input)
wg.Add(1)
go func() {
defer wg.Done()
for v := range input {
output <- workerWork{work: v, workerID: i}
}
}()
}
go func() {
wg.Wait()
close(output)
}()
go func() {
h := fnv.New32a()
for _, batch := range events {
for k, v := range batch {
h.Write([]byte(k))
ui := int(h.Sum32())
d := ui % workers
inputs[d] <- map[string]interface{}{k: v}
h.Reset()
}
}
for _, i := range inputs {
close(i)
}
}()
for work := range output {
fmt.Printf("%#v\n", work)
}
}