I have recently started learning Go.
I found the following Github implementation of a wrapper for database transaction processing and decided to try it out.
I am using PostgreSQL as the database.
Initially, it contains the following data.
testdb=> select * from products;
product_id | price
------------ -------
0001 | 200
0002 | 100
0003 | 150
0004 | 300
(4 rows)
After Process A succeeds, Process B is intentionally made to fail, and a rollback of transaction A is expected. However, when we run it, the rollback does not occur and we end up with the following
In truth, since B failed, the process A should be rolled back and there should be no change in the database value.
I have inserted Logs in places to confirm this, but I am not sure. Why is the rollback not executed?
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service {tx: txAdmin{data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
output
2022/05/26 13:28:55 update
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err
database changes(If the rollback works, the PRICE for id 0004 should remain 300.)
testdb=> select * from products;
product_id | price
------------ -------
0001 | 200
0002 | 100
0003 | 150
0004 | 200
(4 rows)
Please tell me how I can use the wrapper to correctly process transactions.
========= PS. The following code without the wrapper worked properly.
package main
import (
"context"
"database/sql"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-defer-start
type Service struct {
db *sql.DB
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Println("update err")
return err
}
if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
log.Println("update err")
return err
}
return tx.Commit()
}
// transaction-defer-end
func main() {
var database Service
dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
if nil != err {
log.Fatal(err)
}
database.db = dbConn
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
CodePudding user response:
As @Richard Huxton said, pass tx
into a function f
here are the steps:
- add one field on
struct txAdmin
to accommodate*sql.Tx
, sotxAdmin
haveDB
andTx
fields - inside
Transaction
settx
to*txAdmin.Tx
- inside
UpdateProduct
use*Service.tx.Tx
for every query
so the final code looks like this:
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
*sql.Tx
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
// set tx to Tx
t.Tx = tx
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service{tx: txAdmin{DB: data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}