Home > Mobile >  A problem of concurrency - sending streams of INT on two channels - reading from one
A problem of concurrency - sending streams of INT on two channels - reading from one

Time:07-11

What I should implement:

  1. a go routine (let's call it A) that generates random INT's and puts them on a channel and pauses after each channel push, 1 second.
  2. a second go routine (B) that does the same. Puts random INT's to channel B and pauses for 2 seconds.
  3. Now, I have to read from both channels, and create a SUM. For example. First element that comes from channel A with first element that comes from channel B - make a sum and put it on a channel C (and so on 1) until there are 100 sums created.
  4. When 100 sums are done (put in channel C and read) - close channel A , channel B and channel C.

What I have until now:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {

    a := make(chan int, 10)
    b := make(chan int, 10)
    c := make(chan string, 10)

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            a <- rand.Intn(101)
            time.Sleep(time.Millisecond * 100)
        }
    }()

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            b <- rand.Intn(101)
            time.Sleep(time.Millisecond * 300)
        }
    }()

    go func() {
        for {
            select {
            case ai := <-a:
                bi := <-b
                sum := ai   bi
                c <- fmt.Sprintf("%d   %d = %d", ai, bi, sum)

            }
        }
    }()

    sums := 0
    for val := range c {
        if sums == 10 {
            close(c)
            close(b)
            close(a)
        }
        println(val)
        sums  
    }

}

For testing purposes I changed seconds to milliseconds and instead of 100 sums, I verify for 10 but you get the idea.

Extra info: channel A and channel B have to be buffered at 100 items. As well, for testing purposes I only put 10 here.

I keep receiving deadlocks every now and then and I get why. My problem is that, I don't understand how can I close two sending channels from a receiver channel. Can anyone solve this mistery and explain a bit to me.

Thank you!

CodePudding user response:

Not a deadlock, but you should receive a panic when you write to the closed channels a and b.

You have to use another channel to let the goroutines know that processing is finished.

done := make(chan struct{})

Change the goroutines to test for done:

go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            select {
               case b <- rand.Intn(101):
                  time.Sleep(time.Millisecond * 300)
               case <-done:
                   return
             }
        }
    }()

go func() {
        for {
            select {
            case ai := <-a:
                bi := <-b
                sum := ai   bi
                c <- fmt.Sprintf("%d   %d = %d", ai, bi, sum)
             case <-done:
                close(c) // tell listeners that we are done
                return
            }
        }
    }()

When you're done, close the done channel. This will also cause c to be closed:

for val := range c {
        if sums == 10 {
            close(done)
        }
        println(val)
        sums  
    }

When c is closed, the for loop will terminate.

CodePudding user response:

This is how you can implement it:

Create not buffered channels.

a go routine:

Post 100 ints with sleep to channel a and CLOSE channel a when all 100 posted

b go routine:

Post 200 ints with sleep to channel b and CLOSE channel b when all 200 posted

c goroutine:

  1. Listen from channels a and b until they both closed
  2. Put values from a and b into two dedicated queues We need to use intermediate data storage to unblock channels.
  3. After adding data into any of the queues, check if there is a data in second one. If both queues contain data, get an element from each and send the sum to channel c.
  4. Close c when a and b closed and at least one queue is empty.

With this design, go-routines a and b controls data flow, and you can implement any number of items and delays ratio without listener making any assumptions about that.

Buffered channels can be used if you know in advance a number of items you are going to send into a and b. It is not memory efficient though because you have to allocate memory for worst case scenario. With dynamic queues, you only keep in memory data that was not sent to c.

CodePudding user response:

set channel to nil

func main() {
    a := make(chan int, 10)
    b := make(chan int, 10)
    c := make(chan string, 10)

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            if a == nil {
                break
            } else {
                a <- rand.Intn(101)
            }
            time.Sleep(time.Millisecond * 100)
        }
        println("a loop is quit")
    }()

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            if b == nil {
                break
            } else {
                b <- rand.Intn(101)
            }
            time.Sleep(time.Millisecond * 300)
        }
        println("b loop is quit")
    }()

    go func() {
        for {
            ai := <-a
            bi := <-b
            sum := ai   bi
            if a == nil || b == nil || c == nil {
                break
            } else {
                c <- fmt.Sprintf("%d   %d = %d", ai, bi, sum)
            }
        }
        println("sum loop is quit")
    }()

    sums := 0
    for val := range c {
        println(val)
        sums  
        if sums == 10 {
            close(a)
            close(b)
            close(c)
            a, b, c = nil, nil, nil
        }
    }
    time.Sleep(time.Second)
}

playground : https://go.dev/play/p/-Im_vCcessJ

  • Related