I have below code in a method on server side which send streaming response back to the client.
var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, _ := us.GenerateInfo(clientId, resources, cfg)
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
My confusion is can we make this to run in concurrent? or streaming always run in single threaded way?
CodePudding user response:
You can make the work concurrent, but you can't make sending on the stream concurrent. From grpc-go
docs:
When using streams, one must take care to avoid calling either
SendMsg
orRecvMsg
multiple times against the same Stream from different goroutines. [...]
So you can run your concurrent code in separate goroutines and send the output values on a common channel. The main stream handler then ranges over this channel and calls stream.Send
sequentially — so keep in mind that all this is worth it only if the network response takes less than fetching the data.
The code looks like this:
// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))
// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
// don't close around the loop variable
go func (id int64) {
defer wg.Done()
val, err := // obtain output value somehow
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
The number of goroutines is the number of elements in data
. If you want to control the number of goroutines, batch data
. If you do this, adjust the channel buffer accordingly.