Home > Back-end >  Go producer consumer avoiding deadlock
Go producer consumer avoiding deadlock

Time:01-22

I have a code for consumer and producer in go. Although I have asked this question for code-review here and a good part of the idea was derived from this thread here here is the code in playground.

  • This code has multiple producers and consumers sharing the same channel.
  • This code has an error handling mechanism, if any of the workers (producer or consumer) errors out than all the workers should by halted.

I am concerned about deadlock scenario where all consumers are shut but producer is still adding data to shared channel. To "mitigate" this I have added a context check right before adding data into the data queue - specifically Line 85 in go playground.

However is a dead lock still possible if - the producer checks for context.Done() in Line 85, then the context is cancelled causing all consumers to shut down, and then the producer tries to insert data into the queue ?

If so how to mitigate.

Reposting the code:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i   {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i   {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i   {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i   {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

CodePudding user response:

Regarding deadlock: it doesn't appear to be possible in the scenario you describe. If the producer is able to send to the channel, that means there is a consumer that can receive, and this will happen atomically, so it is not possible to have a producer decide to send to the channel but then the consumer stops. The producer sends to the channel only if the consumer can consume.

But then, there are of course possible improvements. Since you are only interested in the first error, you can simply do:

func main() {
  ...
  var firstErr error
  go func() {
      for err := range errCh {
         if firstErr!=nil {
           firstErr=err
         }
      }
  }()
  ...
  close(errCh)
  ...

CodePudding user response:

Nope, you're fine, this won't deadlock on a producer write because you're wrapping the channel writes in a select statement, so even if the channel write can't happen because the consumers have terminated, you'll still hit the context cancellation clause and terminate your producer.

Just to demonstrate the concept, you can run this and see it doesn't deadlock despite the fact that it's attempting a channel write with no readers.

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        cancel()
    }()

    select {
    case ch <- struct{}{}:
    case <-ctx.Done():
        fmt.Println("context canceled")
    }
    fmt.Println("bye!")
}

Here's the playground link for it.

Regarding some code simplification. If this is for any sort of real-life code, I'd probably just use a Group from golang.org/x/sync/errgroup. Either that or take a cue from them and leverage sync.Once and wrap all of your producers and consumers with a function that spawns the goroutines and can handle the errors without the more complex error channel draining code you have in your error handling function.

  • Related