Home > Blockchain >  io.Pipe() causes WaitGroup to get stuck
io.Pipe() causes WaitGroup to get stuck

Time:05-26

I am processing a huge data file which is approx. 100 GB. Each line in that huge file is a JSON piece of data which I'd like to read, compress, and store in an in memory database.

var wg sync.WaitGroup
for {
    line, err := reader.ReadString('\n')
    if err != nil {
        break
    }
    go func(index int) {
        wg.Add(1)
        pr, pw := io.Pipe()
        zw := lzw.NewWriter(pw, lzw.LSB, 8)
        _, err := io.Copy(zw, strings.NewReader(line))
        pw.Close()
        zw.Close()
        if err != nil {
            fmt.Println(err.Error())
        }
        b, err := io.ReadAll(pr)
        if err != nil {
            fmt.Println(err.Error())
        }
        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
        pr.Close()
        wg.Done()
    }(index)
    if index000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }
    index  = 1
}

However, this code stops after processing the first 10000 lines. When I move down the wg.Add(1) after the zw.Close() it keeps on processing the rest of the line (but becomes instable). Without the lzw and io.Pipe() when I try to store the exact values in an uncompressed way, then everything works without any issue.

I am not sure whether I am not using the WaitGroup correctly or there is something associated with the io.Pipe() which I am not aware of yet.

CodePudding user response:

1- Removing pr, pw := io.Pipe() makes the code more simple, since it is superfluous, try this:

package main

import (
    "bufio"
    "bytes"
    "compress/lzw"
    "context"
    "encoding/base64"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
    "time"
)

func main() {
    // file, err := os.Open("big.txt")
    // if err != nil {
    //  log.Fatal(err)
    // }

    index := 1
    client := &myClient{}
    reader := bufio.NewReader(file)
    var wg sync.WaitGroup
    for {
        line, err := reader.ReadString('\n')
        fmt.Print("line =", line)
        if err == io.EOF {
            fmt.Println("\n**** All done **** index =", index)
            wg.Wait()
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        wg.Add(1)
        go func(index int) {
            // pr, pw := io.Pipe()
            var buf bytes.Buffer
            pw := bufio.NewWriter(&buf)
            { // lexical scoping (static scoping)
                // It is the caller's responsibility to call Close on the WriteCloser when finished writing.
                zw := lzw.NewWriter(pw, lzw.LSB, 8)
                n, err := io.Copy(zw, strings.NewReader(line))
                if err != nil {
                    log.Fatal(err)
                }
                if int(n) != len(line) {
                    log.Fatal(n, len(line))
                }
                if err = zw.Close(); err != nil {
                    log.Fatal(err)
                }
            }
            // if err = pw.Close(); err != nil {
            //  log.Fatal(err)
            // }

            // b, err := io.ReadAll(pr)
            // if err != nil {
            //  log.Fatal(err)
            // }
            b := buf.Bytes()
            fmt.Println("index =", index, b)
            ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
            client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), 1000*time.Hour)
            // if err = pr.Close(); err != nil {
            //  log.Fatal(err)
            // }
            cancelFunc()
            wg.Done()
        }(index)

        if index%2 == 0 {
            fmt.Println("\n**** Wait **** index =", index)
            wg.Wait()
        }
        index  
    }
}

type myClient struct {
}

func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
    fmt.Println("a =", a, ", b =", b, ", t =", t)
    if ctx.Err() != nil {
        fmt.Println(ctx.Err())
    }
}

// just for the Go Playground:
var file, myw = io.Pipe()

func init() {
    go func() {
        for i := 0; i < 3; i   {
            fmt.Fprintln(myw, "text to compress aaaaaaaaaaaaaa")
        }
        myw.Close()
    }()
}

2- You need to put the wg.Add(1) before go func(index int) {:

    wg.Add(1)
    go func(index int) {

3- The wg.Wait() logic:

if index000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }

What happens for the last iterations if index000 != 0

  •  Tags:  
  • go
  • Related