Is it possible to use PostgreSQL's COPY FROM STDIN
statement to load data from CSV file by passing some sort of Reader
or Writer
object the same way it's done in Java? What library should I use? Kotlin example for reference:
val cm = CopyManager(conn as BaseConnection)
val total = cm.copyIn("COPY my_table FROM STDIN FORMAT csv", inputStream)
CodePudding user response:
Looking at the two most popular Golang postgres libraries:
lib/pq
The driver supports this via the standard library methods, but you'll need to consume the io.Reader
variable to pass each CSV row through.
package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/lib/pq"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pq driver
db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return err
}
sc := bufio.NewScanner(r)
for sc.Scan() {
if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
return err
}
}
if err = sc.Err(); err != nil {
return err
}
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
return tx.Commit()
}
Note
- lib/pq is in maintenance mode
- Numerous data races have been reported in the underlying code that handles
COPY
statements, with at least one still open
jackc/pgx
The lower level pgconn.PgConn
type has a CopyFrom
method that allows you to pass an arbitrary statement and io.Reader
. If you're connecting via the stdlib db
package, you can still get access to the underlying pgconn.PgConn
as shown below, although there are other ways of handling connections / pools, etc, when using pgx, so it's worth taking a look at those too.
package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pgx driver
db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
return conn.Raw(func(driverConn any) error {
pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
_, err := pgConn.CopyFrom(ctx, r, query)
return err
})
}