Home > database >  Deadlock when trying to code a pool of worker methods
Deadlock when trying to code a pool of worker methods

Time:12-03

In the code hereunder, I don't understand why the "Worker" methods seem to exit instead of pulling values from the input channel "in" and processing them.

I had assumed they would only return after having consumed all input from the input channel "in" and processing them

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    i   int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    for item := range in {
        item *= item // returns the square of the input value
        fmt.Printf("=> %d: %d\n", id, item)
        out <- Result{item, id}
    }
    wg.Done()
    fmt.Printf("%d exiting ", id)
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id   {
        fmt.Printf("Starting : %d\n", id)
        wg.Add(1)
        go Worker(in, out, id, &wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 4
)

func main() {
    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 100; i   {
            in <- i
        }
        close(in)
    }()
    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out : %d: %d", item.i, item.val)
    }
}


The output is

Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!

CodePudding user response:

fatal error: all goroutines are asleep - deadlock!

The full error shows where each goroutine is "stuck". If you run this in the playground, it will even show you the line number. That made it easy for me to diagnose.

Your Run_parallel runs in the main groutine, so before main can read from out, Run_parallel must return. Before Run_parallel can return, it must wg.Wait(). But before the workers call wg.Done(), they must write to out. That's what causes a deadlock.

One solution is simple: just run Run_parallel concurrently in its own Goroutine.

    go Run_parallel(NW, in, out, Worker)

Now, main ranges over out, waiting on outs closure to signal completion. Run_parallel waits for the workers with wg.Wait(), and the workers will range over in. All the work will get done, and the program won't end until it's all done. (https://go.dev/play/p/oMrgH2U09tQ)

CodePudding user response:

Alternative formulation of the solution:

In that alternative formulation , it is not necessary to start Run_parallel as a goroutine (it triggers its own goroutine). I prefer that second solution, because it automates the fact that Run_parallel() has to run parallel to the main function. Also, for the same reason it's safer, less error-prone (no need to remember to run Run_parallel with the go keyword).

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    go func() {
        wg := sync.WaitGroup{}
        defer close(out) // close the output channel when all tasks are completed
        for id := 0; id < n_workers; id   {
            wg.Add(1)
            go Worker(in, out, id, &wg)
        }
        wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
    }()
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        defer close(in)
        for i := 0; i < 10; i   {
            in <- i
        }
    }()

    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %d\n", item.id, item.val)
    }

    println("- - - All done - - -")
}

CodePudding user response:

Solution :

Run_parallel has to run in it’s own goroutine:

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id   {
        wg.Add(1)
        go Worker(in, out, id, &wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 10; i   {
            in <- i
        }
        close(in)
    }()

    go Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %d\n", item.id, item.val)
    }

    println("- - - All done - - -")

}

  • Related