Home > Blockchain >  Go timeout for Read
Go timeout for Read

Time:02-11

I'd like to do something like unix's tail -f, but on the output produced by a process run through Go's Cmd facility.

My google-fu is not up to par, evidently, but I did find this article which lead me to write the following code, which almost works, with a bizarre twist I'm hoping I can get help with.

If it matters, I'm running this on a Mac.

First, here's the minimal program that's compiled to be the slowroll executable:

package main

import (
    "fmt"
    "time"
)

func main() {
    line := 1
    for {
        fmt.Println("This is line", line)
        line  = 1
        time.Sleep(2 * time.Second)
    }
}

When run, it produces the following output, one line every 2 seconds:

    > ./slowroll
    This is line 1
    This is line 2
    This is line 3
    This is line 4

And so on.

Here's the package code that attempts to read this, but allowing timeouts so other things can be done:

package timeout_io

import (
    "bufio"
    "bytes"
    "context"
    "errors"
    "time"
)

const BufferSize = 4096

var ErrTimeout = errors.New("timeout")

type TimeoutReader struct {
    b *bufio.Reader
    t time.Duration
}

func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
    return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}

func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
    prev := r.t
    r.t = t
    return prev
}

type CallResponse struct {
    Resp string
    Err  error
}

func helper(r *bufio.Reader) <-chan *CallResponse {
    respChan := make(chan *CallResponse, 1)

    go func() {
        resp, err := r.ReadString('\n')

        if err != nil {
            respChan <- &CallResponse{resp, err}
        } else {
            respChan <- &CallResponse{resp, nil}
        }

        return
    }()

    return respChan
}

func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
    select {
    case <-ctx.Done():
        return "", ErrTimeout
    case respChan := <-helper(r.b):
        return respChan.Resp, respChan.Err
    }
}

func (r *TimeoutReader) ReadLine() (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), r.t)
    defer cancel()

    s, err := r.ReadLineCtx(ctx)
    if err != nil {
        return "", err
    }

    return s, nil
}

Finally, here's the main code that calls ReadLine with timeout:

package main

import (
    "bytes"
    "fmt"
    "io"
    "os"
    "os/exec"
    "sync"
    "time"

    "watcher/timeout_io"
)

func main() {
    var stdOut bytes.Buffer
    var stdErr bytes.Buffer
    runCommand := &exec.Cmd{
        Path:   "./slowroll",
        Stdout: &stdOut,
        Stderr: &stdErr,
    }

    var wg sync.WaitGroup

    go func(wg *sync.WaitGroup) {
        defer wg.Done()

        err := runCommand.Run()
        if err != nil {
            fmt.Println("failed due to error:", err)
            os.Exit(1)
        }
    }(&wg)

    wg.Add(1)

    stdOutReader := timeout_io.NewTimeoutReader(&stdOut)
    stdOutReader.SetTimeout(10 * time.Millisecond)
    index := 1
    for {
        s, err := stdOutReader.ReadLine()
        if err != nil {
            if err != timeout_io.ErrTimeout && err != io.EOF {
                fmt.Println("ReadLine got error", err)
                break
            }
        } else if len(s) > 0 {
            fmt.Println("index: ", index, " s: ", s)
            index  = 1
            s = ""
        }
    }

    wg.Wait()
    fmt.Println("Done!")
}

When run, it produces the following output:

    > go run watcher.go
    index:  1  s:  This is line 1
    
    index:  2  s:  This is line 2
    
    index:  3  s:  This is line 2
    
    index:  4  s:  This is line 3
    
    index:  5  s:  This is line 2
    
    index:  6  s:  This is line 3
    
    index:  7  s:  This is line 4
    
    index:  8  s:  This is line 2
    
    index:  9  s:  This is line 3
    
    index:  10  s:  This is line 4
    
    index:  11  s:  This is line 5

And so on.

Occasionally, some slowroll output lines don't show up at all; which lines get repeated is random.

So that's my mystery... I don't see where the (apparent) loop is happening that causes the lines to be produced multiple times.

Thanks very much in advance for any help!

CodePudding user response:

if err != timeout_io.ErrTimeout && err != io.EOF { ...; break; }

With such a condition, an ErrTimeout will be silently ignored and will not interrupt your reading loop.

Also note that reaching io.EOF would send your program in an endless loop (try using echo "Hello" instead of ./slowroll as a command).


You probably want to place the break instruction after the if block :

if err != timeout_io.ErrTimeout && err != io.EOF {
    fmt.Println("ReadLine got error", err)
}
break

CodePudding user response:

Realized late last night that I was kind of fighting go's standard behavior.

Should have explained that the goal was to be able to watch stdout and stderr at the same time.

Taking @Zombo's advice above, I switched to cmd.StdoutPipe and cmd.StderrPipe.

The main idea is to just have goroutines that read the pipes and put content found into channels, and then select between the channels.

So slowroll.go does not produce infinite output, to show that EOF doesn't cause a infinite loop:

package main

import (
    "fmt"
    "os"
    "time"
)

func main() {
    line := 1
    for {
        fmt.Println("This is line", line)
        line  = 1
        time.Sleep(2 * time.Second)
        if line%3 == 0 {
            fmt.Fprintf(os.Stderr, "This is error %d\n", line)
        }
        if line > 10 {
            break
        }
    }
}

And the simpler, working watcher.go is now:

package main

import (
    "bufio"
    "fmt"
    "os"
    "os/exec"
    "sync"
)

func main() {
    runCommand := &exec.Cmd{
        Path: "./slowroll",
    }
    stdOut, err := runCommand.StdoutPipe()
    if err != nil {
        fmt.Println("Can't create StdoutPipe:", err)
        os.Exit(1)
    }
    stdErr, err := runCommand.StderrPipe()
    if err != nil {
        fmt.Println("Can't create StderrPipe:", err)
        os.Exit(1)
    }

    var wg sync.WaitGroup

    go func(wg *sync.WaitGroup) {
        defer wg.Done()

        err := runCommand.Run()
        if err != nil {
            fmt.Println("failed due to error:", err)
            os.Exit(1)
        }
    }(&wg)

    wg.Add(1)

    stdOutChan := make(chan string, 1)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()

        scanner := bufio.NewScanner(stdOut)
        for scanner.Scan() {
            stdOutChan <- string(scanner.Bytes())
        }
        fmt.Println("Ran out of stdout input, read thread bailing.")
        close(stdOutChan)
    }(&wg)

    wg.Add(1)

    stdErrChan := make(chan string, 1)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()

        scanner := bufio.NewScanner(stdErr)
        for scanner.Scan() {
            stdErrChan <- string(scanner.Bytes())
        }
        fmt.Println("Ran out of stderr input, read thread bailing.")
        close(stdErrChan)
    }(&wg)

    wg.Add(1)

    index := 1
    keepGoing := true
    for keepGoing {
        select {
        case res, isOpen := <-stdOutChan:
            if !isOpen {
                fmt.Println("stdOutChan is no longer open, main bailing.")
                keepGoing = false
            } else {
                fmt.Println(index, "s:", res)
                index  = 1
            }

        case res, isOpen := <-stdErrChan:
            if !isOpen {
                fmt.Println("stdErrChan is no longer open, main bailing.")
                keepGoing = false
            } else {
                fmt.Println(index, "error s:", res)
                index  = 1
            }
        }
    }

    wg.Wait()
    fmt.Println("Done!")
}

Output:

    > go run watcher.go
    1 s: This is line 1
    2 s: This is line 2
    3 error s: This is error 3
    4 s: This is line 3
    5 s: This is line 4
    6 s: This is line 5
    7 s: This is line 6
    8 error s: This is error 6
    9 s: This is line 7
    10 s: This is line 8
    11 s: This is line 9
    12 error s: This is error 9
    13 s: This is line 10
    Ran out of stdout input, read thread bailing.
    stdOutChan is no longer open, main bailing.
    Ran out of stderr input, read thread bailing.
    Done!

Could stand with some refactoring, obviously, but it works, which is the goal.

Thanks!

CodePudding user response:

Simplify the code by creating a pipe and reading from that pipe:

cmd := exec.Command("./slowroll")
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

s := bufio.NewScanner(stdout)
for s.Scan() {
    fmt.Printf("%s\n", s.Bytes())
}

If your goal is to monitor the combined output of stderr and stdin, then use the same pipe for both:

cmd := exec.Command("./slowroll")
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // <-- use stdout pipe for stderr
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

s := bufio.NewScanner(combined)
for s.Scan() {
    fmt.Printf("%s\n", s.Bytes())
}

The code in the question has a data race on the stdOut bytes.Buffer.

  • Related