Home > Net >  How to write to two different csv files concurrently in Go?
How to write to two different csv files concurrently in Go?

Time:11-03

I've created a minimal reproduceable example

package main

import (
    "encoding/csv"
    "fmt"
    "os"
    "strconv"
    "sync/atomic"
    "time"
)

var (
    csvOnePath = "test.csv"
    csvTwoPath = "test_two.csv"
)

type A struct {
    Running     int32 // used atomically
    QuitChan    chan struct{}
}

func NewA() *A {
    return &A{
        QuitChan: make(chan struct{}),
    }
}

func (a *A) Start() error {
    if ok := atomic.CompareAndSwapInt32(&a.Running, 0, 1); !ok {
        return fmt.Errorf("Cannot start service A: service already started")
    }
    go a.record()
    return nil
}

func (a *A) Stop() error {
    if ok := atomic.CompareAndSwapInt32(&a.Running, 1, 0); !ok {
        return fmt.Errorf("Cannot stop service A: service already stopped")
    }
    close(a.QuitChan)
    return nil
}

func (a *A) record() {
    //file_one, err := os.OpenFile(csvOnePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
    file_one, err := os.Create(csvOnePath)
    if err != nil {
        fmt.Println(err)
        return
    }
    writer := csv.NewWriter(file_one)
    // writer, closeFileFunc, err := NewCsvWriter(csvOnePath)
    if err != nil {
        fmt.Println(err)
        return
    }
    header := []string{"this", "is", "a", "test"}
    err = writer.Write(header)
    if err != nil {
        fmt.Println(err)
        return
    }
    ticker := time.NewTicker(10*time.Second)
    for {
        select {
        case t := <-ticker.C:
            err = writer.Write([]string{fmt.Sprintf("-:-:-", t.Hour(), t.Minute(), t.Second())})
            if err != nil {
                fmt.Println(err)
                a.QuitChan <- struct{}{}
            }
        case <-a.QuitChan:
            ticker.Stop()
            writer.Flush()
            file_one.Close()
            fmt.Println("Stopped recording.")
            break
        }
    }
}

type B struct {
    Running     int32 // used atomically
    QuitChan    chan struct{}
}

func NewB() *B {
    return &B{
        QuitChan: make(chan struct{}),
    }
}

func (b *B) Start() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
        return fmt.Errorf("Cannot start service B: service already started")
    }
    go b.record()
    return nil
}

func (b *B) Stop() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
        return fmt.Errorf("Cannot stop service B: service already stopped")
    }
    close(b.QuitChan)
    return nil
}

func writeMsgToReport(report *csv.Writer, msg string) error {
    ct := time.Now()
    timestamp := fmt.Sprintf("-:-:-", ct.Hour(), ct.Minute(), ct.Second())
    return report.Write([]string{timestamp, msg})
}

func (b *B) record() {
    //file_two, err := os.OpenFile(csvTwoPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
    file_two, err := os.Create(csvTwoPath)
    if err != nil {
        fmt.Println(err)
        return
    }
    writer := csv.NewWriter(file_two)
    //writer, closeFileFunc, err := NewCsvWriter(csvTwoPath)
    if err != nil {
        fmt.Println(err)
        return
    }
    header := []string{"this", "is", "a", "second", "test"}
    err = writer.Write(header)
    if err != nil {
        fmt.Println(err)
        return
    }
    ticker := time.NewTicker(1*time.Second)
    ticks := 0
    for {
        select {
        case <-ticker.C:
            if ticks % 15 == 0 {
                err = writeMsgToReport(writer, "YEET " strconv.Itoa(ticks))
                if err != nil {
                    fmt.Println(err)
                    b.QuitChan <- struct{}{}
                }
            }
            ticks  
        case <-b.QuitChan:
            ticker.Stop()
            writer.Flush()
            file_two.Close()
            fmt.Println("Stopped recording.")
            break
        }
    }
}

func main() {
    serviceA := NewA()
    err := serviceA.Start()
    if err != nil {
        fmt.Println(err)
        return
    }
    defer serviceA.Stop()
    serviceB := NewB()
    err = serviceB.Start()
    if err != nil {
        fmt.Println(err)
        return
    }
    defer serviceB.Stop()
    time.Sleep(600*time.Second)
}

Essentially, I have two different services that run a record method in two different goroutines. They each create and write to a different csv file at different times. When I run this, the csv files are created but never have data. No errors are ever raised while running this. I read that I should use a mutex which I've implemented but this hasn't worked either. What should I do here?

CodePudding user response:

As detailed in the comments the program will exit when main() completes; the spec states "It does not wait for other (non-main) goroutines to complete.".

This means that it is unlikely that your go routines will process the code that closes the files meaning that buffered data may not be written.

I created a simplified version of your application in the playground to demonstrate this.

There are a number of ways to fix this but the simplest is probably to add a WaitGroup so your application can wait for the go routines to exit before terminating.

  • Related