Assuming I have a bunch of files to deal with(say 1000 or more), first they should be processed by function A()
, function A()
will generate a file, then this file will be processed by B().
If we do it one by one, that's too slow, so I'm thinking process 5 files at a time using goroutine(we can not process too much at a time cause the CPU cannot bear).
I'm a newbie in golang, I'm not sure if my thought is correct, I think the function A()
is a producer and the function B()
is a consumer, function B()
will deal with the file that produced by function A()
, and I wrote some code below, forgive me, I really don't know how to write the code, can anyone give me a help? Thank you in advance!
package main
import "fmt"
var Box = make(chan string, 1024)
func A(file string) {
fmt.Println(file, "is processing in func A()...")
fileGenByA := "/path/to/fileGenByA1"
Box <- fileGenByA
}
func B(file string) {
fmt.Println(file, "is processing in func B()...")
}
func main() {
// assuming that this is the file list read from a directory
fileList := []string{
"/path/to/file1",
"/path/to/file2",
"/path/to/file3",
}
// it seems I can't do this, because fileList may have 1000 or more file
for _, v := range fileList {
go A(v)
}
// can I do this?
for file := range Box {
go B(file)
}
}
CodePudding user response:
you're halfway there. There's a few things you need to fix:
- your program deadlocks because nothing closes
Box
, so the main function can never get donerange
ing over it. - You aren't waiting for your goroutines to finish, and there than 5 goroutines. (The solutions to these are too intertwined to describe them separately)
1. Deadlock
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
When you range
over a channel, you read each value from the channel until it is both closed and empty. Since you never close
the channel, the range
over that channel can never complete, and the program can never finish.
This is a fairly easy problem to solve in your case: we just need to close the channel when we know there will be no more writes to the channel.
for _, v := range fileList {
go A(v)
}
close(Box)
Keep in mind that close
ing a channel doesn't stop it from being read, only written. Now consumers can distinguish between an empty channel that may receive more data in the future, and an empty channel that will never receive more data.
Once you add the close(Box)
, the program doesn't deadlock anymore, but it still doesn't work.
2. Too Many Goroutines and not waiting for them to complete
To run a certain maximum number of concurrent executions, instead of creating a goroutine for each input, create the goroutines in a "worker pool":
- Create a channel to pass the workers their work
- Create a channel for the goroutines to return their results, if any
- Start the number of goroutines you want
- Start at least one additional goroutine to either dispatch work or collect the result, so you don't have to try doing both from the main goroutine
- use a
sync.WaitGroup
to wait for all data to be processed close
the channels to signal to the workers and the results collector that their channels are done being filled.
Before we get into the implementation, let's talk aobut how A
and B
interact.
first they should be processed by function A(), function A() will generate a file, then this file will be processed by B().
A()
and B()
must, then, execute serially. They can still pass their data through a channel, but since their execution must be serial, it does nothing for you. Simpler is to run them sequentially in the workers. For that, we'll need to change A()
to either call B
, or to return the path for B
and the worker can call. I choose the latter.
func A(file string) string {
fmt.Println(file, "is processing in func A()...")
fileGenByA := "/path/to/fileGenByA1"
return fileGenByA
}
Before we write our worker function, we also must consider the result of B
. Currently, B
returns nothing. In the real world, unless B()
cannot fail, you would at least want to either return the error, or at least panic
. I'll skip over collecting results for now.
Now we can write our worker function.
func worker(wg *sync.WaitGroup, incoming <-chan string) {
defer wg.Done()
for file := range incoming {
B(A(file))
}
}
Now all we have to do is start 5 such workers, write the incoming files to the channel, close it, and wg.Wait()
for the workers to complete.
incoming_work := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 5; i {
wg.Add(1)
go worker(&wg, incoming_work)
}
for _, v := range fileList {
incoming_work <- v
}
close(incoming_work)
wg.Wait()
Full example at https://go.dev/play/p/A1H4ArD2LD8
Returning Results.
It's all well and good to be able to kick off goroutines and wait for them to complete. But what if you need results back from your goroutines? In all but the simplest of cases, you would at least want to know if files failed to process so you could investigate the errors.
We have only 5 workers, but we have many files, so we have many results. Each worker will have to return several results. So, another channel. It's usually worth defining a struct
for your return:
type result struct {
file string
err error
}
This tells us not just whether there was an error but also clearly defines which file from which the error resulted.
How will we test an error case in our current code? In your example, B
always gets the same value from A
. If we add A
's incoming file name to the path it passes to B
, we can mock an error based on a substring. My mocked error will be that file3
fails.
func A(file string) string {
fmt.Println(file, "is processing in func A()...")
fileGenByA := "/path/to/fileGenByA1/" file
return fileGenByA
}
func B(file string) (r result) {
r.file = file
fmt.Println(file, "is processing in func B()...")
if strings.Contains(file, "file3") {
r.err = fmt.Errorf("Test error")
}
return
}
Our workers will be sending results, but we need to collect them somewhere. main()
is busy dispatching work to the workers, blocking on its write to incoming_work
when the workers are all busy. So the simplest place to collect the results is another goroutine. Our results collector goroutine has to read from a results channel, print out errors for debugging, and the return the total number of failures so our program can return a final exit status indicating overall success or failure.
failures_chan := make(chan int)
go func() {
var failures int
for result := range results {
if result.err != nil {
failures
fmt.Printf("File %s failed: %s", result.file, result.err.Error())
}
}
failures_chan <- failures
}()
Now we have another channel to close, and it's important we close it after all workers are done. So we close(results)
after we wg.Wait()
for the workers.
close(incoming_work)
wg.Wait()
close(results)
if failures := <-failures_chan; failures > 0 {
os.Exit(1)
}
Putting all that together, we end up with this code:
package main
import (
"fmt"
"os"
"strings"
"sync"
)
func A(file string) string {
fmt.Println(file, "is processing in func A()...")
fileGenByA := "/path/to/fileGenByA1/" file
return fileGenByA
}
func B(file string) (r result) {
r.file = file
fmt.Println(file, "is processing in func B()...")
if strings.Contains(file, "file3") {
r.err = fmt.Errorf("Test error")
}
return
}
func worker(wg *sync.WaitGroup, incoming <-chan string, results chan<- result) {
defer wg.Done()
for file := range incoming {
results <- B(A(file))
}
}
type result struct {
file string
err error
}
func main() {
// assuming that this is the file list read from a directory
fileList := []string{
"/path/to/file1",
"/path/to/file2",
"/path/to/file3",
}
incoming_work := make(chan string)
results := make(chan result)
var wg sync.WaitGroup
for i := 0; i < 5; i {
wg.Add(1)
go worker(&wg, incoming_work, results)
}
failures_chan := make(chan int)
go func() {
var failures int
for result := range results {
if result.err != nil {
failures
fmt.Printf("File %s failed: %s", result.file, result.err.Error())
}
}
failures_chan <- failures
}()
for _, v := range fileList {
incoming_work <- v
}
close(incoming_work)
wg.Wait()
close(results)
if failures := <-failures_chan; failures > 0 {
os.Exit(1)
}
}
And when we run it, we get:
/path/to/file1 is processing in func A()...
/path/to/fileGenByA1//path/to/file1 is processing in func B()...
/path/to/file2 is processing in func A()...
/path/to/fileGenByA1//path/to/file2 is processing in func B()...
/path/to/file3 is processing in func A()...
/path/to/fileGenByA1//path/to/file3 is processing in func B()...
File /path/to/fileGenByA1//path/to/file3 failed: Test error
Program exited.
A final thought: buffered channels.
There is nothing wrong with buffered channels. Especially if you know the overall size of incoming work and results, buffered channels can obviate the results collector goroutine because you can allocate a buffered channel big enough to hold all results. However, I think it's more straightforward to understand this pattern if the channels are unbuffered. The key takeaway is that you don't need to know the number of incoming or outgoing results, which could indeed be different numbers or based on something that can't be predetermined.
CodePudding user response:
You could spawn 5 goroutines that read from a work channel. That way you have at all times 5 goroutines running and don't need to batch them so that you have to wait until 5 are finished to start the next 5.
func main() {
stack := []string{
"foo",
"bar",
"baz",
"qux",
"quux",
"corge",
}
work := make(chan string)
results := make(chan string)
// create 5 go routines
wg := sync.WaitGroup{}
for i := 0; i < 5; i {
wg.Add(1)
go func() {
defer wg.Done()
for s := range work {
results <- B(A(s))
}
}()
}
// collect the results
go func() {
for result := range results {
fmt.Println(result)
}
}()
// send the work to the workers
for _, s := range stack {
work <- s
}
close(work)
// wait for the workers to finish
// then close the results channel
wg.Wait()
close(results)
}
https://play.golang.com/p/IgoMfAR-Tya
CodePudding user response:
Please check this.
package main
import (
"fmt"
"sync"
"time"
)
var batchSize = 5
func A(file string, releaseReq chan struct{}, box chan string, done *sync.WaitGroup) {
defer func() {
<-releaseReq
done.Done()
}()
time.Sleep(2 * time.Second)
fmt.Println(file, "is processing in func A()...")
fileGenByA := "/path/to/fileGenByA1"
box <- fileGenByA
}
func B(file string, done *sync.WaitGroup) {
defer func() {
done.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println(file, "is processing in func B()...")
}
func main() {
fileList := []string{
"/path/to/file1",
"/path/to/file2",
"/path/to/file3",
"/path/to/file4",
"/path/to/file5",
"/path/to/file6",
"/path/to/file7",
"/path/to/file8",
"/path/to/file9",
"/path/to/file10",
}
box := make(chan string, 5)
var doneProcessA sync.WaitGroup
doneProcessA.Add(1)
go func() {
rateLimitter := make(chan struct{}, 5)
var processA sync.WaitGroup
for _, v := range fileList {
rateLimitter <- struct{}{}
processA.Add(1)
go A(v, rateLimitter, box, &processA)
}
processA.Wait()
doneProcessA.Done()
close(box)
}()
var doneProcessB sync.WaitGroup
doneProcessB.Add(1)
go func() {
var processB sync.WaitGroup
for file := range box {
processB.Add(1)
go B(file, &processB)
}
processB.Wait()
doneProcessB.Done()
}()
doneProcessA.Wait()
doneProcessB.Wait()
}