Home > Blockchain >  Using PostgreSQL COPY FROM STDIN
Using PostgreSQL COPY FROM STDIN

Time:11-24

Is it possible to use PostgreSQL's COPY FROM STDIN statement to load data from CSV file by passing some sort of Reader or Writer object the same way it's done in Java? What library should I use? Kotlin example for reference:

val cm = CopyManager(conn as BaseConnection)
val total = cm.copyIn("COPY my_table FROM STDIN FORMAT csv", inputStream)

CodePudding user response:

Looking at the two most popular Golang postgres libraries:

lib/pq

The driver supports this via the standard library methods, but you'll need to consume the io.Reader variable to pass each CSV row through.

package main

import (
    "bufio"
    "context"
    "database/sql"
    "fmt"
    "io"
    "log"
    "os"
    "os/signal"

    "github.com/lib/pq"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    if err := run(ctx); err != nil {
        log.Fatal(err)
    }
}

func run(ctx context.Context) error {
    // Open CSV file (this could also be os.Stdin, etc)
    f, err := os.Open("/tmp/csv")
    if err != nil {
        return err
    }
    defer f.Close()

    // Open database connection using pq driver
    db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
    if err != nil {
        return err
    }
    defer db.Close()

    // Execute copy
    if err = copyFrom(ctx, db, "my_table", f); err != nil {
        return err
    }

    return nil
}

func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
    query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))

    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    stmt, err := tx.PrepareContext(ctx, query)
    if err != nil {
        return err
    }

    sc := bufio.NewScanner(r)
    for sc.Scan() {
        if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
            return err
        }
    }
    if err = sc.Err(); err != nil {
        return err
    }

    if _, err = stmt.ExecContext(ctx); err != nil {
        return err
    }

    return tx.Commit()
}

Note

  1. lib/pq is in maintenance mode
  2. Numerous data races have been reported in the underlying code that handles COPY statements, with at least one still open

jackc/pgx

The lower level pgconn.PgConn type has a CopyFrom method that allows you to pass an arbitrary statement and io.Reader. If you're connecting via the stdlib db package, you can still get access to the underlying pgconn.PgConn as shown below, although there are other ways of handling connections / pools, etc, when using pgx, so it's worth taking a look at those too.

package main

import (
    "context"
    "database/sql"
    "fmt"
    "io"
    "log"
    "os"
    "os/signal"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/stdlib"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    if err := run(ctx); err != nil {
        log.Fatal(err)
    }
}

func run(ctx context.Context) error {
    // Open CSV file (this could also be os.Stdin, etc)
    f, err := os.Open("/tmp/csv")
    if err != nil {
        return err
    }
    defer f.Close()

    // Open database connection using pgx driver
    db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
    if err != nil {
        return err
    }
    defer db.Close()

    // Execute copy
    if err = copyFrom(ctx, db, "my_table", f); err != nil {
        return err
    }

    return nil
}

func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
    query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())

    conn, err := db.Conn(ctx)
    if err != nil {
        return err
    }
    defer conn.Close()

    return conn.Raw(func(driverConn any) error {
        pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
        _, err := pgConn.CopyFrom(ctx, r, query)
        return err
    })
}
  • Related