Home > Software design >  Sending value into Channel and Reading output when Ready
Sending value into Channel and Reading output when Ready

Time:02-04

I am trying to construct a receiver and sender pattern using two channels in Golang. I am doing a task (API call), and receiving back a Response struct. My goal is that when a response is received I'd like to send it to another channel (writeChan) for additional processing.

I'd like to continuously read/listen on that receiver channel (respChan) and process anything that comes through (such as a Response). Then I'd like to spin up a thread to go and do a further operation with that Response in another goroutine.

I'd like to understand how I can chain together this pattern to allow data to flow from the API calls and concurrently write it (each Response will be written to a separate file destination which the Write() func handles.

Essentially my current pattern is the following:

package main

import (
    "fmt"
    "sync"
)

func main() {

    var wg sync.WaitGroup
    respChan := make(chan Response) // Response is a struct that contains API response metadata
    defer close(respChan)
    // requests is just a slice of requests to be made to an API
    // This part is working well
    for _, req := range requests {
        wg.Add(1)
        go func(r Request) {
            defer wg.Done()
            resp, _ := r.Get() // Make the API call and receive back a Response struct
            respChan <- resp // Put the response into our channel
        }(req)
    }

    // Now, I want to extract the responses as they become available and send them to another function to do some processing. This I am unsure of how to handle properly
    writeChan := make(chan string)
    defer close(writeChan)
    select {
        case resp := <-respChan: // receive from response channel
            go func(response Response) {
                signal, _ := Write(response) // Separate func to write the response to a file. Not important here in this context.
                writeChan <- signal // Put the signal data into the channel which is a string file path of where the file was written (will be used for a later process)

            }(resp)
        case <-time.After(15 *time.Second):
            fmt.Println("15 seconds have passed without receiving anything...")

    }
    wg.Wait()
}

CodePudding user response:

Let me share with you a working example that you can benefit from. First, I'm gonna present the code, then, I'm gonna walk you through all the relevant sections.

package main

import (
    "fmt"
    "net/http"
    "os"
    "strings"
    "time"
)

type Request struct {
    Url            string
    DelayInSeconds time.Duration
}

type Response struct {
    Url        string
    StatusCode int
}

func main() {
    requests := []Request{
        {"https://www.google.com", 0},
        {"https://stackoverflow.com", 1},
        {"https://www.wikipedia.com", 4},
    }

    respChan := make(chan Response)
    defer close(respChan)

    for _, req := range requests {
        go func(r Request) {
            fmt.Printf("%q - %v\n", r.Url, strings.Repeat("#", 30))
            // simulate heavy work
            time.Sleep(time.Second * r.DelayInSeconds)
            resp, _ := http.Get(r.Url)
            res := Response{r.Url, resp.StatusCode}
            fmt.Println(time.Now())
            respChan <- res
        }(req)
    }

    writeChan := make(chan struct{})
    defer close(writeChan)

    for i := 0; i < len(requests); i   {
        select {
        case res := <-respChan:
            go func(r Response) {
                f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
                if err != nil {
                    panic(err)
                }
                defer f.Close()
                f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
                writeChan <- struct{}{}
            }(res)
        case <-time.After(time.Second * 2):
            fmt.Println("Timeout")
        }
    }
}

Set up

First, I've defined the two structs that will be used in the example: Request and Response. In the former, I put a DelayInSeconds to mock some heavy loads and time-consuming operations. Then, I defined the requests variable that contains all the requests that have to be done.

The writing part

Here, I range over the requests variable. For each request, I'm gonna issue an HTTP request to the target URL. The time.Sleep emulate the heavy load. Then, I write the response to the respChan channel which is unbuffered.

The reading part

Here, the major change is to wrap the select construct into a for loop. Thanks to this, we'll make sure to iterate the right times (based on the length of the requests variable).

Final notes

First of all, bear in mind that the code is oversimplified just to show off the relevant parts. Due to this, a lot of error handling is missing and some inline functions could be extrapolated into named functions. You don't need to use sync.WaitGroup to achieve what you need, the usage of channels will be enough.
Feel free to play with delays and check which files are written!

Let me know if this helps you!

Edit

As requested, I'm gonna provide you with a more accurate solution based on your needs. The new reading part will be something like the following:

count := 0
for {
    // this check is need to exit the for loop and not wait indefinitely
    // it can be removed based on your needs
    if count == 3 {
        fmt.Println("all responses arrived...")
        return
    }
    res := <-respChan
    count  
    go func(r Response) {
        f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
        if err != nil {
            panic(err)
        }
        defer f.Close()
        f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
        writeChan <- struct{}{}
    }(res)
}

Here, the execution is waiting indefinitely within the for loop. No matter how long each request takes to complete, it will be fetched as soon as it arrives. I put, at the top of the for loop, an if to exit after it processed the requests that we need. However, you can avoid it and let the code run till a cancellation signal comes in (it's up to you).

Let me know if this better meets your requirements, thanks!

  • Related