Home > Software design >  Passing a csv.NewWriter() to another func in Golang to Write to file Asynchronously
Passing a csv.NewWriter() to another func in Golang to Write to file Asynchronously

Time:02-04

I am making API calls (potentially thousands in a single job) and as they return and complete, I'd like to be able to write them to a single shared file (say CSV for simplicity) instead of waiting for all of them to complete before writing.

How could I share a single csv.Writer() in a way that effectively writes to a single file shared by many threads. This may be too daunting of a task, but I was curious if there was a way to go about it.

package main
import (
    "encoding/csv"
    "os"
)

type Row struct {
    Field1 string
    Field2 string
}

func main () {
    file, _ := os.Create("file.csv")
    w := csv.NewWriter(file)

    // Some operations to create a slice of Row structs that will contain the rows 
    // To write
    var rowsToWrite []Row
    // Now lets iterate over and write to file
    // Ideally, I'd like to do this in a goroutine but not entirely sure about thread safe writes
    for _, r := range rowsToWrite {
        go func(row, writer) {
            err := writeToFile(row, writer)
            if err != nil {
            // Handle error
            }
        }(r, w)
    }

}

func writeToFile(row Row, writer ???) error {

    // Use the shared writer to maintain where I am at in the file so I can append to the CSV
    if err := w.Write(row); err != nil {
        return err
    }
    return nil

}


CodePudding user response:

I would (personally) not have the same file open for writing at two separate points in the code. Depending on how the OS handles buffered writes, etc., you can end up with "interesting" things happening.

Given how you've described your goals, one might do something like (this is off the top of my head and not rigorously tested):

  1. Create a channel to queue blocks of text (I assume) to be written - make(chan []byte, depth) - depth could be tuneable based on some tests you'd run, presumably.
  2. Have a goroutine open a filehandle for writing on your file, then read from that queueing channel, writing whatever it gets from the channel to that file
  3. you could then have n goroutines writing to the queueing channel, and as long as you don't exceed the channel capacity (outrun your ability to write), you might never need to worry about locks.

If you did want to use locks, then you'd need a sync.Mutex shared between the goroutines responsible for enqueueing.

Season to taste, obviously.

CodePudding user response:

Considering this question, and this question also, I think the simplest approach for you is:

  1. Make your API requests concurrently, storing results in a slice
  2. Process slice of results/write to CSV

I'll use the https://jsonplaceholder.typicode.com Todo API to exemplify what I suggest. The API returns a Todo object, modeled with this struct:

type Todo struct {
    UserID    int    `json:"userId"`
    ID        int    `json:"id"`
    Title     string `json:"title"`
    Completed bool   `json:"completed"`
}

and to convert that to a CSV record:

func (t Todo) toRecord() []string {
    userID := strconv.Itoa(t.UserID)
    id := strconv.Itoa(t.ID)
    completed := strconv.FormatBool(t.Completed)

    return []string{userID, id, t.Title, completed}
}

I then have a function to take one of those placeholder endpoints, make the HTTP GET, and marshall it into the struct. Notice the log lines:

// getTodo gets endpoint and unmarshalls the response JSON into todo.
func getTodo(endpoint string) (todo Todo) {
    log.Println("getting", endpoint)

    resp, err := http.Get(endpoint)
    if err != nil {
        log.Println("error:", err)
    }
    defer resp.Body.Close()
    json.NewDecoder(resp.Body).Decode(&todo)

    log.Println("got Todo", todo.ID)

    return
}

To run it all, I:

  • start with just making 5 API requests and pre-allocate a slice of result Todos with a length of 5 (the number of expected final structs)
  • set a WaitGroup with the number of requests to make
  • iteratively call getTodo for each endpoint in its own goroutine and save the result to its preallocated slot in the slice (this is not a race, at least according to go run -race main.go)
    • each iteration decrements the WaitGroup
  • wait till all requests are complete—the WaitGroup is 0
  • write the results to CSV
func main() {
    const nAPICalls = 5
    todos := make([]Todo, nAPICalls)

    var wg sync.WaitGroup
    wg.Add(nAPICalls)

    for i := 0; i < nAPICalls; i   {
        go func(x int) {
            s := fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", x 1)
            todos[x] = getTodo(s)
            wg.Done()
        }(i)
    }

    wg.Wait()

    w := csv.NewWriter(os.Stdout)
    w.Write([]string{"UserID", "ID", "Title", "Completed"})

    for _, todo := range todos {
        w.Write(todo.toRecord())
    }

    w.Flush()
}

Here's how that looks. Note the log and that the requests are happening concurrently, and, for these 5, all within one second:

2023/02/03 11:18:41 getting https://jsonplaceholder.typicode.com/todos/2
2023/02/03 11:18:41 getting https://jsonplaceholder.typicode.com/todos/5
2023/02/03 11:18:41 getting https://jsonplaceholder.typicode.com/todos/3
2023/02/03 11:18:41 getting https://jsonplaceholder.typicode.com/todos/4
2023/02/03 11:18:41 getting https://jsonplaceholder.typicode.com/todos/1
2023/02/03 11:18:41 got Todo 5
2023/02/03 11:18:41 got Todo 1
2023/02/03 11:18:41 got Todo 3
2023/02/03 11:18:41 got Todo 4
2023/02/03 11:18:41 got Todo 2
UserID,ID,Title,Completed
1,1,delectus aut autem,false
1,2,quis ut nam facilis et officia qui,false
1,3,fugiat veniam minus,false
1,4,et porro tempora,true
1,5,laboriosam mollitia et enim quasi adipisci quia provident illum,false

This approach also has the unintended side-effect that the results are ordered by their position in the slice, even if the actual HTTP requests were made out of order.

If I run that for all 200 available Todos, it still only takes 2 seconds to make all the GETs:

2023/02/03 11:44:45 getting https://jsonplaceholder.typicode.com/todos/28
...
2023/02/03 11:44:46 got Todo 75
UserID,ID,Title,Completed
1,1,delectus aut autem,false

You mentioned "receiver ... sender pattern" in your other question... have you read Go Concurrency Patterns: Pipelines and cancellation? It really lays out, and builds-up well, the idea of creating a simple pipeline, fanning out to multiple workers, and then fanning in their results. It's taken me many reads and trials, but I've finally started to get at least a "muscle memory" (maybe even an intuition) for how to design concurrent processes.

  • Related