I'm trying to run the following flow:
- Get data from somewhere
- Create new local CSV file, write the data into that file
- Upload the CSV to Bigquery
- Delete the local file
But it seems to load empty data. This is the code:
func (c *Client) Do(ctx context.Context) error {
bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
if err != nil {
return err
}
data, err := c.GetSomeData(ctx)
if err != nil {
return err
}
file, err := os.Create("example.csv")
if err != nil {
return err
}
defer file.Close()
// also file need to be delete
writer := csv.NewWriter(file)
defer writer.Flush()
timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
for _, d := range data {
csvRow := []string{
d.ID,
d.Name,
timestamp,
}
err = writer.Write(csvRow)
if err != nil {
log.Printf("error writing data to CSV: %v\n", err)
}
}
source := bigquery.NewReaderSource(file)
source.Schema = bigquery.Schema{
{Name: "id", Type: bigquery.StringFieldType},
{Name: "name", Type: bigquery.StringFieldType},
{Name: "createdAt", Type: bigquery.TimestampFieldType},
}
if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
return err
}
return nil
}
LoadCSV()
looks like this:
func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return "", err
}
status, err := job.Wait(ctx)
if err != nil {
return job.ID(), err
}
if status.Err() != nil {
return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
}
return job.ID(), nil
}
After running this, bigquery does create the schema but with no data.
If I'm changing os.Create()
to os.Open()
and the file already exist, everything work. It's like when loading the CSV the file data is not yet written (?)
What's the reason?
CodePudding user response:
The problem I see here is that you don't rewind the file handle's cursor to the beginning of the file. Thus, the next read will be at the end of the file, and will be a 0 byte read. That explains why it seems like there's no content in the file.
https://pkg.go.dev/os#File.Seek can handle this for you.
Actually, the Flush
is not relevant, because you're using the same file handle to read the file than you did to write it, so you'll see your own written bytes even without a flush. This would not be the case if the file was opened by a different process or was reopened.
Demonstration:
package main
import (
"fmt"
"io"
"os"
)
func main() {
f, err := os.CreateTemp("", "data.csv")
if err != nil {
panic(err)
} else {
defer f.Close()
defer os.Remove(f.Name())
}
fmt.Fprintf(f, "hello, world")
fmt.Fprintln(os.Stderr, "Before rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
f.Seek(0, io.SeekStart)
fmt.Fprintln(os.Stderr, "\nAfter rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
fmt.Fprintln(os.Stderr, "\n")
}
% go run t.go
Before rewind:
After rewind:
hello, world