Im trying to process a CSV file im reading from AWS S3, for every line of text i would like to activate the worker
function to do some work and return a results
ideally i would want the results to be ordered as the original CSV, but its not a requirement, for some reason when I run this code I get weird data races and this line:
for result := range output {
results = append(results, result)
}
blocks forever
I tried using a WaitGroup which also didn't work, closing the output
channel also leads me to an error of "trying to put something in a closed channel"
func main() {
resp, err := ReadCSV(bucket, key)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
detector := NewDetector(languages)
var results []DetectionResult
numWorkers := 4
input := make(chan string, numWorkers)
output := make(chan DetectionResult, numWorkers)
start := time.Now()
for w := 1; w < numWorkers 1; w {
go worker(w, detector, input, output)
}
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
close(input)
break
}
if err != nil {
log.Fatal(err)
}
text := record[0]
input <- text
}
}()
for result := range output {
results = append(results, result)
}
elapsed := time.Since(start)
log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}
func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
log.Printf("worker %d started\n", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finished\n", id)
}
Trying to process a CSV (ideally in order), and enrich it with results of a function call to worker
Tried setting WaitGroup, tried closing the output channel when finished reading (EOF) - results in an error
CodePudding user response:
The for-loop will read until output
channel closes. You have to close the output
channel when you're done processing all the input (not when you're done reading the input).
You can use a wait group for this:
func worker(detector lingua.LanguageDetector, wg *sync.WaitGroup) func(id int, input chan string, output chan DetectionResult) {
wg.Add(1)
return func(id int, input chan string, output chan DetectionResult) {
defer wg.Done() // Notify wg when processing is finished
log.Printf("worker %d started\n", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finished\n", id)
}
}
Then:
go func() {
wg.Wait()
close(output)
}()
for result := range output {
results = append(results, result)
}
CodePudding user response:
I see that you're missing a way to signal the workers that there are no more jobs and that they should stop working. You also need a way for the workers to signal back that they are indeed done. When all those signals have been sent and received, main should be in control with the accumulated results of all the workers.
We can signal the workers by closing input after all CSV records have iterated, and all jobs have been sent through input:
nWorkers := 4
input := make(chan Tx, nWorkers*2) // buffer so input (the "jobs queue") is always full; see rationale at bottom of answer
output := make(chan Ty)
done := make(chan bool)
for i := 1; i < nWorkers 1; i {
go worker(input, output, done)
}
go func() {
for {
record, _ := reader.Read()
input <- record[0]
}
close(input)
}()
The goroutine sending jobs on input can safely close input when there are no more jobs. A worker will still be able to receive any jobs still in input, even after it's closed.
When input is closed and is finally empty, a worker's range loop exits. A worker then signals back by sending on the done channel:
func worker(input <-chan Tx, output chan<- Ty, done <-chan bool) {
for x := range input { // loop until input is closed
output <- doWork(x)
}
done <- true // finally send done
}
When we have received nWorker-number of done messages, we know all work is completed and that workers will not be sending on output, so it's safe to close output:
go func() {
log.Println("counting done workers")
var doneCtr int
for {
select {
case <-done:
log.Println("got done")
doneCtr
}
if doneCtr == nWorkers {
close(output) // signal the results appender to stop
log.Println("closed output")
}
}
}()
Closing output is the signal to main that it can stop trying to receive and accumulate the results:
results := make([]result, 0)
for result := range output {
results = append(results, result)
}
Finally: all other goroutines have terminated, and main can carry on with the accumulated results.
As for getting the results in the original order, that's just a matter of sending the original order with each job, sending that order back with the result, then sorting on the order:
type row struct {
num int
text string
}
type result struct {
lang language
row row
}
...
input <- row{rowNum, record[0]}
rowNum
...
output <- result{detect(row.text), row}
...
results = append(results, result)
...
sort.Slice(results, func(i, j int) bool { return results[i].row.num < results[j].row.num})
I made a complete, working mock-up in The Go Playground.
My reasoning may be off on buffering, but, as I see it, the only real letdown would be to discover that the worker was stalled waiting for input. Buffering input by 2x the number workers ensures that each worker has on average two jobs waiting at any instant.