I am processing a huge data file which is approx. 100 GB. Each line in that huge file is a JSON piece of data which I'd like to read, compress, and store in an in memory database.
var wg sync.WaitGroup
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
go func(index int) {
wg.Add(1)
pr, pw := io.Pipe()
zw := lzw.NewWriter(pw, lzw.LSB, 8)
_, err := io.Copy(zw, strings.NewReader(line))
pw.Close()
zw.Close()
if err != nil {
fmt.Println(err.Error())
}
b, err := io.ReadAll(pr)
if err != nil {
fmt.Println(err.Error())
}
client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
pr.Close()
wg.Done()
}(index)
if index000 == 0 {
fmt.Println(index)
wg.Wait()
}
index = 1
}
However, this code stops after processing the first 10000 lines. When I move down the wg.Add(1)
after the zw.Close()
it keeps on processing the rest of the line (but becomes instable). Without the lzw
and io.Pipe()
when I try to store the exact values in an uncompressed way, then everything works without any issue.
I am not sure whether I am not using the WaitGroup
correctly or there is something associated with the io.Pipe()
which I am not aware of yet.
CodePudding user response:
1- Removing pr, pw := io.Pipe()
makes the code more simple, since it is superfluous, try this:
package main
import (
"bufio"
"bytes"
"compress/lzw"
"context"
"encoding/base64"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
)
func main() {
// file, err := os.Open("big.txt")
// if err != nil {
// log.Fatal(err)
// }
index := 1
client := &myClient{}
reader := bufio.NewReader(file)
var wg sync.WaitGroup
for {
line, err := reader.ReadString('\n')
fmt.Print("line =", line)
if err == io.EOF {
fmt.Println("\n**** All done **** index =", index)
wg.Wait()
break
}
if err != nil {
log.Fatal(err)
}
wg.Add(1)
go func(index int) {
// pr, pw := io.Pipe()
var buf bytes.Buffer
pw := bufio.NewWriter(&buf)
{ // lexical scoping (static scoping)
// It is the caller's responsibility to call Close on the WriteCloser when finished writing.
zw := lzw.NewWriter(pw, lzw.LSB, 8)
n, err := io.Copy(zw, strings.NewReader(line))
if err != nil {
log.Fatal(err)
}
if int(n) != len(line) {
log.Fatal(n, len(line))
}
if err = zw.Close(); err != nil {
log.Fatal(err)
}
}
// if err = pw.Close(); err != nil {
// log.Fatal(err)
// }
// b, err := io.ReadAll(pr)
// if err != nil {
// log.Fatal(err)
// }
b := buf.Bytes()
fmt.Println("index =", index, b)
ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), 1000*time.Hour)
// if err = pr.Close(); err != nil {
// log.Fatal(err)
// }
cancelFunc()
wg.Done()
}(index)
if index%2 == 0 {
fmt.Println("\n**** Wait **** index =", index)
wg.Wait()
}
index
}
}
type myClient struct {
}
func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
fmt.Println("a =", a, ", b =", b, ", t =", t)
if ctx.Err() != nil {
fmt.Println(ctx.Err())
}
}
// just for the Go Playground:
var file, myw = io.Pipe()
func init() {
go func() {
for i := 0; i < 3; i {
fmt.Fprintln(myw, "text to compress aaaaaaaaaaaaaa")
}
myw.Close()
}()
}
2- You need to put the wg.Add(1)
before go func(index int) {
:
wg.Add(1)
go func(index int) {
3- The wg.Wait()
logic:
if index000 == 0 {
fmt.Println(index)
wg.Wait()
}
What happens for the last iterations if index000 != 0