Home > Blockchain >  Problem synchronizing between Goroutines, one blocks the other
Problem synchronizing between Goroutines, one blocks the other

Time:11-05

Im trying to process a CSV file im reading from AWS S3, for every line of text i would like to activate the worker function to do some work and return a results

ideally i would want the results to be ordered as the original CSV, but its not a requirement, for some reason when I run this code I get weird data races and this line:

for result := range output {
   results = append(results, result)
}

blocks forever

I tried using a WaitGroup which also didn't work, closing the output channel also leads me to an error of "trying to put something in a closed channel"

func main() {
    resp, err := ReadCSV(bucket, key)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()
    reader := csv.NewReader(resp.Body)

    detector := NewDetector(languages)
    var results []DetectionResult

    numWorkers := 4
    input := make(chan string, numWorkers)
    output := make(chan DetectionResult, numWorkers)

    start := time.Now()

    for w := 1; w < numWorkers 1; w   {
        go worker(w, detector, input, output)
    }

    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                close(input)
                break
            }

            if err != nil {
                log.Fatal(err)
            }

            text := record[0]
            input <- text
        }
    }()

    for result := range output {
        results = append(results, result)
    }

    elapsed := time.Since(start)

    log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}

func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
    log.Printf("worker %d started\n", id)
    for t := range input {
        result := DetectText(detector, t)
        output <- result
    }
    log.Printf("worker %d finished\n", id)
}

Trying to process a CSV (ideally in order), and enrich it with results of a function call to worker

Tried setting WaitGroup, tried closing the output channel when finished reading (EOF) - results in an error

CodePudding user response:

The for-loop will read until output channel closes. You have to close the output channel when you're done processing all the input (not when you're done reading the input).

You can use a wait group for this:

func worker(detector lingua.LanguageDetector, wg *sync.WaitGroup) func(id int, input chan string, output chan DetectionResult) {
   wg.Add(1)
   return func(id int, input chan string, output chan DetectionResult) {
      defer wg.Done() // Notify wg when processing is finished
      log.Printf("worker %d started\n", id)
      for t := range input {
         result := DetectText(detector, t)
         output <- result
      }
      log.Printf("worker %d finished\n", id)
   }
}

Then:

go func() {
    wg.Wait()
    close(output)
}()
for result := range output {
        results = append(results, result)
}

CodePudding user response:

I see that you're missing a way to signal the workers that there are no more jobs and that they should stop working. You also need a way for the workers to signal back that they are indeed done. When all those signals have been sent and received, main should be in control with the accumulated results of all the workers.

We can signal the workers by closing input after all CSV records have iterated, and all jobs have been sent through input:

nWorkers := 4

input := make(chan Tx, nWorkers*2) // buffer so input (the "jobs queue") is always full; see rationale at bottom of answer
output := make(chan Ty)
done := make(chan bool)

for i := 1; i < nWorkers 1; i   {
    go worker(input, output, done)
}

go func() {
    for {
        record, _ := reader.Read()
        input <- record[0]
    }
    close(input)
}()

The goroutine sending jobs on input can safely close input when there are no more jobs. A worker will still be able to receive any jobs still in input, even after it's closed.

When input is closed and is finally empty, a worker's range loop exits. A worker then signals back by sending on the done channel:

func worker(input <-chan Tx, output chan<- Ty, done <-chan bool) {
    for x := range input { // loop until input is closed
        output <- doWork(x)
    }
    done <- true // finally send done
}

When we have received nWorker-number of done messages, we know all work is completed and that workers will not be sending on output, so it's safe to close output:

go func() {
    log.Println("counting done workers")
    var doneCtr int
    for {
        select {
        case <-done:
            log.Println("got done")
            doneCtr  
        }

        if doneCtr == nWorkers {
            close(output) // signal the results appender to stop
            log.Println("closed output")
        }
    }
}()

Closing output is the signal to main that it can stop trying to receive and accumulate the results:

results := make([]result, 0)
for result := range output {
    results = append(results, result)
}

Finally: all other goroutines have terminated, and main can carry on with the accumulated results.

As for getting the results in the original order, that's just a matter of sending the original order with each job, sending that order back with the result, then sorting on the order:

type row struct {
    num  int
    text string
}

type result struct {
    lang language
    row  row
}

...

input <- row{rowNum, record[0]}
rowNum  

...

output <- result{detect(row.text), row}

...

results = append(results, result)

...

sort.Slice(results, func(i, j int) bool { return results[i].row.num < results[j].row.num})

I made a complete, working mock-up in The Go Playground.

My reasoning may be off on buffering, but, as I see it, the only real letdown would be to discover that the worker was stalled waiting for input. Buffering input by 2x the number workers ensures that each worker has on average two jobs waiting at any instant.

  • Related