Home > database >  Protobuf type for bigquery TIMESTAMP field
Protobuf type for bigquery TIMESTAMP field

Time:12-28

I am streaming data to bigquery from Golang using the new Storage API. The schema of my bigquery table contains a TIMESTAMP field as follows:

bq mk -t mydataset.mytable name:string,lastseen:timestamp

Separately I have defined a protocol buffer like this:

message Row {
    string Name = 1;
    google.protobuf.Timestamp LastSeen = 3;
}

However, when I submit this data to BigQuery, I get the following error:

rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP

It seems that the google.protobuf.Timestamp protobuf does not correspond to the TIMESTAMP type in bigquery. This makes sense because the bigquery docs say that a TIMESTAMP contains a timezone, but google.protobuf.Timestamp does not contain a timezone. But which protocol buffer then should I use?

I'm using code derived from this repository, which looks like this:

import (
    "context"
    "fmt"
    "log"

    storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
    "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
    storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
    "google.golang.org/protobuf/proto"
    timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable2"
)

func main() {
    ctx := context.Background()

    // the data we will stream to bigquery
    var rows = []*Row{
        {Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
        {Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
        {Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
    }

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}

CodePudding user response:

Yeah, the backend doesn't decode proto Timestamp messages properly.

Quickest resolution answer: send int64 type, populate as epoch microseconds.

https://cloud.google.com/bigquery/docs/write-api#data_type_conversions

  • Related