Home > database >  how to batch dealing with files using Goroutine?
how to batch dealing with files using Goroutine?

Time:03-14

Assuming I have a bunch of files to deal with(say 1000 or more), first they should be processed by function A(), function A() will generate a file, then this file will be processed by B().

If we do it one by one, that's too slow, so I'm thinking process 5 files at a time using goroutine(we can not process too much at a time cause the CPU cannot bear).

I'm a newbie in golang, I'm not sure if my thought is correct, I think the function A() is a producer and the function B() is a consumer, function B() will deal with the file that produced by function A(), and I wrote some code below, forgive me, I really don't know how to write the code, can anyone give me a help? Thank you in advance!

package main

import "fmt"

var Box = make(chan string, 1024)

func A(file string) {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    Box <- fileGenByA
}

func B(file string) {
    fmt.Println(file, "is processing in func B()...")
}

func main() {
    // assuming that this is the file list read from a directory
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
    }

    // it seems I can't do this, because fileList may have 1000 or more file
    for _, v := range fileList {
        go A(v)
    }

    // can I do this?
    for file := range Box {
        go B(file)
    }
}

CodePudding user response:

you're halfway there. There's a few things you need to fix:

  1. your program deadlocks because nothing closes Box, so the main function can never get done rangeing over it.
  2. You aren't waiting for your goroutines to finish, and there than 5 goroutines. (The solutions to these are too intertwined to describe them separately)

1. Deadlock

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()

When you range over a channel, you read each value from the channel until it is both closed and empty. Since you never close the channel, the range over that channel can never complete, and the program can never finish.

This is a fairly easy problem to solve in your case: we just need to close the channel when we know there will be no more writes to the channel.

    for _, v := range fileList {
        go A(v)
    }
    close(Box)

Keep in mind that closeing a channel doesn't stop it from being read, only written. Now consumers can distinguish between an empty channel that may receive more data in the future, and an empty channel that will never receive more data.

Once you add the close(Box), the program doesn't deadlock anymore, but it still doesn't work.

2. Too Many Goroutines and not waiting for them to complete

To run a certain maximum number of concurrent executions, instead of creating a goroutine for each input, create the goroutines in a "worker pool":

  • Create a channel to pass the workers their work
  • Create a channel for the goroutines to return their results, if any
  • Start the number of goroutines you want
  • Start at least one additional goroutine to either dispatch work or collect the result, so you don't have to try doing both from the main goroutine
  • use a sync.WaitGroup to wait for all data to be processed
  • close the channels to signal to the workers and the results collector that their channels are done being filled.

Before we get into the implementation, let's talk aobut how A and B interact.

first they should be processed by function A(), function A() will generate a file, then this file will be processed by B().

A() and B() must, then, execute serially. They can still pass their data through a channel, but since their execution must be serial, it does nothing for you. Simpler is to run them sequentially in the workers. For that, we'll need to change A() to either call B, or to return the path for B and the worker can call. I choose the latter.

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    return fileGenByA
}

Before we write our worker function, we also must consider the result of B. Currently, B returns nothing. In the real world, unless B() cannot fail, you would at least want to either return the error, or at least panic. I'll skip over collecting results for now.

Now we can write our worker function.

func worker(wg *sync.WaitGroup, incoming <-chan string) {
    defer wg.Done()
    for file := range incoming {
        B(A(file))
    }
}

Now all we have to do is start 5 such workers, write the incoming files to the channel, close it, and wg.Wait() for the workers to complete.

    incoming_work := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 5; i   {
        wg.Add(1)
        go worker(&wg, incoming_work)
    }
    for _, v := range fileList {
        incoming_work <- v
    }
    close(incoming_work)
    wg.Wait()

Full example at https://go.dev/play/p/A1H4ArD2LD8

Returning Results.

It's all well and good to be able to kick off goroutines and wait for them to complete. But what if you need results back from your goroutines? In all but the simplest of cases, you would at least want to know if files failed to process so you could investigate the errors.

We have only 5 workers, but we have many files, so we have many results. Each worker will have to return several results. So, another channel. It's usually worth defining a struct for your return:

type result struct {
  file string
  err error
}

This tells us not just whether there was an error but also clearly defines which file from which the error resulted.

How will we test an error case in our current code? In your example, B always gets the same value from A. If we add A's incoming file name to the path it passes to B, we can mock an error based on a substring. My mocked error will be that file3 fails.

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1/"   file
    return fileGenByA
}

func B(file string) (r result) {
    r.file = file
    fmt.Println(file, "is processing in func B()...")
    if strings.Contains(file, "file3") {
        r.err = fmt.Errorf("Test error")
    }
    return
}

Our workers will be sending results, but we need to collect them somewhere. main() is busy dispatching work to the workers, blocking on its write to incoming_work when the workers are all busy. So the simplest place to collect the results is another goroutine. Our results collector goroutine has to read from a results channel, print out errors for debugging, and the return the total number of failures so our program can return a final exit status indicating overall success or failure.

    failures_chan := make(chan int)
    go func() {
        var failures int
        for result := range results {
            if result.err != nil {
                failures  
                fmt.Printf("File %s failed: %s", result.file, result.err.Error())
            }
        }
        failures_chan <- failures

    }()

Now we have another channel to close, and it's important we close it after all workers are done. So we close(results) after we wg.Wait() for the workers.

    close(incoming_work)
    wg.Wait()
    close(results)
    if failures := <-failures_chan; failures > 0 {
        os.Exit(1)
    }

Putting all that together, we end up with this code:

package main

import (
    "fmt"
    "os"
    "strings"
    "sync"
)

func A(file string) string {
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1/"   file
    return fileGenByA
}

func B(file string) (r result) {
    r.file = file
    fmt.Println(file, "is processing in func B()...")
    if strings.Contains(file, "file3") {
        r.err = fmt.Errorf("Test error")
    }
    return
}

func worker(wg *sync.WaitGroup, incoming <-chan string, results chan<- result) {
    defer wg.Done()
    for file := range incoming {
        results <- B(A(file))
    }
}

type result struct {
    file string
    err  error
}

func main() {
    // assuming that this is the file list read from a directory
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
    }
    incoming_work := make(chan string)
    results := make(chan result)
    var wg sync.WaitGroup
    for i := 0; i < 5; i   {
        wg.Add(1)
        go worker(&wg, incoming_work, results)
    }
    failures_chan := make(chan int)
    go func() {
        var failures int
        for result := range results {
            if result.err != nil {
                failures  
                fmt.Printf("File %s failed: %s", result.file, result.err.Error())
            }
        }
        failures_chan <- failures

    }()
    for _, v := range fileList {
        incoming_work <- v
    }
    close(incoming_work)
    wg.Wait()
    close(results)
    if failures := <-failures_chan; failures > 0 {
        os.Exit(1)
    }
}

And when we run it, we get:

/path/to/file1 is processing in func A()...
/path/to/fileGenByA1//path/to/file1 is processing in func B()...
/path/to/file2 is processing in func A()...
/path/to/fileGenByA1//path/to/file2 is processing in func B()...
/path/to/file3 is processing in func A()...
/path/to/fileGenByA1//path/to/file3 is processing in func B()...
File /path/to/fileGenByA1//path/to/file3 failed: Test error
Program exited.

A final thought: buffered channels.

There is nothing wrong with buffered channels. Especially if you know the overall size of incoming work and results, buffered channels can obviate the results collector goroutine because you can allocate a buffered channel big enough to hold all results. However, I think it's more straightforward to understand this pattern if the channels are unbuffered. The key takeaway is that you don't need to know the number of incoming or outgoing results, which could indeed be different numbers or based on something that can't be predetermined.

CodePudding user response:

You could spawn 5 goroutines that read from a work channel. That way you have at all times 5 goroutines running and don't need to batch them so that you have to wait until 5 are finished to start the next 5.

func main() {
    stack := []string{
        "foo",
        "bar",
        "baz",
        "qux",
        "quux",
        "corge",
    }

    work := make(chan string)
    results := make(chan string)

    // create 5 go routines
    wg := sync.WaitGroup{}
    for i := 0; i < 5; i   {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for s := range work {
                results <- B(A(s))
            }
        }()
    }

    // collect the results
    go func() {
        for result := range results {
            fmt.Println(result)
        }
    }()

    // send the work to the workers
    for _, s := range stack {
        work <- s
    }
    close(work)

    // wait for the workers to finish
    // then close the results channel
    wg.Wait()
    close(results)
}

https://play.golang.com/p/IgoMfAR-Tya

CodePudding user response:

Please check this.

package main

import (
    "fmt"
    "sync"
    "time"
)

var batchSize = 5

func A(file string, releaseReq chan struct{}, box chan string, done *sync.WaitGroup) {
    defer func() {
        <-releaseReq
        done.Done()
    }()
    time.Sleep(2 * time.Second)
    fmt.Println(file, "is processing in func A()...")
    fileGenByA := "/path/to/fileGenByA1"
    box <- fileGenByA
}

func B(file string, done *sync.WaitGroup) {
    defer func() {
        done.Done()
    }()
    time.Sleep(1 * time.Second)
    fmt.Println(file, "is processing in func B()...")
}

func main() {
    fileList := []string{
        "/path/to/file1",
        "/path/to/file2",
        "/path/to/file3",
        "/path/to/file4",
        "/path/to/file5",
        "/path/to/file6",
        "/path/to/file7",
        "/path/to/file8",
        "/path/to/file9",
        "/path/to/file10",
    }
    box := make(chan string, 5)

    var doneProcessA sync.WaitGroup
    doneProcessA.Add(1)
    go func() {
        rateLimitter := make(chan struct{}, 5)
        var processA sync.WaitGroup
        for _, v := range fileList {
            rateLimitter <- struct{}{}
            processA.Add(1)
            go A(v, rateLimitter, box, &processA)
        }
        processA.Wait()
        doneProcessA.Done()
        close(box)
    }()

    var doneProcessB sync.WaitGroup
    doneProcessB.Add(1)
    go func() {
        var processB sync.WaitGroup
        for file := range box {
            processB.Add(1)
            go B(file, &processB)
        }
        processB.Wait()
        doneProcessB.Done()
    }()
    doneProcessA.Wait()
    doneProcessB.Wait()
}
  • Related