I'd like to do something like unix's tail -f
, but on the output produced by a process run through Go's Cmd
facility.
My google-fu is not up to par, evidently, but I did find this article which lead me to write the following code, which almost works, with a bizarre twist I'm hoping I can get help with.
If it matters, I'm running this on a Mac.
First, here's the minimal program that's compiled to be the slowroll
executable:
package main
import (
"fmt"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line = 1
time.Sleep(2 * time.Second)
}
}
When run, it produces the following output, one line every 2 seconds:
> ./slowroll
This is line 1
This is line 2
This is line 3
This is line 4
And so on.
Here's the package code that attempts to read this, but allowing timeouts so other things can be done:
package timeout_io
import (
"bufio"
"bytes"
"context"
"errors"
"time"
)
const BufferSize = 4096
var ErrTimeout = errors.New("timeout")
type TimeoutReader struct {
b *bufio.Reader
t time.Duration
}
func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}
func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
prev := r.t
r.t = t
return prev
}
type CallResponse struct {
Resp string
Err error
}
func helper(r *bufio.Reader) <-chan *CallResponse {
respChan := make(chan *CallResponse, 1)
go func() {
resp, err := r.ReadString('\n')
if err != nil {
respChan <- &CallResponse{resp, err}
} else {
respChan <- &CallResponse{resp, nil}
}
return
}()
return respChan
}
func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ErrTimeout
case respChan := <-helper(r.b):
return respChan.Resp, respChan.Err
}
}
func (r *TimeoutReader) ReadLine() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.t)
defer cancel()
s, err := r.ReadLineCtx(ctx)
if err != nil {
return "", err
}
return s, nil
}
Finally, here's the main
code that calls ReadLine
with timeout:
package main
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"sync"
"time"
"watcher/timeout_io"
)
func main() {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
runCommand := &exec.Cmd{
Path: "./slowroll",
Stdout: &stdOut,
Stderr: &stdErr,
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("failed due to error:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutReader := timeout_io.NewTimeoutReader(&stdOut)
stdOutReader.SetTimeout(10 * time.Millisecond)
index := 1
for {
s, err := stdOutReader.ReadLine()
if err != nil {
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
break
}
} else if len(s) > 0 {
fmt.Println("index: ", index, " s: ", s)
index = 1
s = ""
}
}
wg.Wait()
fmt.Println("Done!")
}
When run, it produces the following output:
> go run watcher.go
index: 1 s: This is line 1
index: 2 s: This is line 2
index: 3 s: This is line 2
index: 4 s: This is line 3
index: 5 s: This is line 2
index: 6 s: This is line 3
index: 7 s: This is line 4
index: 8 s: This is line 2
index: 9 s: This is line 3
index: 10 s: This is line 4
index: 11 s: This is line 5
And so on.
Occasionally, some slowroll
output lines don't show up at all; which lines get repeated is random.
So that's my mystery... I don't see where the (apparent) loop is happening that causes the lines to be produced multiple times.
Thanks very much in advance for any help!
CodePudding user response:
if err != timeout_io.ErrTimeout && err != io.EOF { ...; break; }
With such a condition, an ErrTimeout
will be silently ignored and will not interrupt your reading loop.
Also note that reaching io.EOF
would send your program in an endless loop (try using echo "Hello"
instead of ./slowroll
as a command).
You probably want to place the break
instruction after the if block :
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
}
break
CodePudding user response:
Realized late last night that I was kind of fighting go's standard behavior.
Should have explained that the goal was to be able to watch stdout and stderr at the same time.
Taking @Zombo's advice above, I switched to cmd.StdoutPipe
and cmd.StderrPipe
.
The main idea is to just have goroutines that read the pipes and put content found into channels, and then select
between the channels.
So slowroll.go
does not produce infinite output, to show that EOF doesn't cause a infinite loop:
package main
import (
"fmt"
"os"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line = 1
time.Sleep(2 * time.Second)
if line%3 == 0 {
fmt.Fprintf(os.Stderr, "This is error %d\n", line)
}
if line > 10 {
break
}
}
}
And the simpler, working watcher.go
is now:
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"sync"
)
func main() {
runCommand := &exec.Cmd{
Path: "./slowroll",
}
stdOut, err := runCommand.StdoutPipe()
if err != nil {
fmt.Println("Can't create StdoutPipe:", err)
os.Exit(1)
}
stdErr, err := runCommand.StderrPipe()
if err != nil {
fmt.Println("Can't create StderrPipe:", err)
os.Exit(1)
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("failed due to error:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdOut)
for scanner.Scan() {
stdOutChan <- string(scanner.Bytes())
}
fmt.Println("Ran out of stdout input, read thread bailing.")
close(stdOutChan)
}(&wg)
wg.Add(1)
stdErrChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdErr)
for scanner.Scan() {
stdErrChan <- string(scanner.Bytes())
}
fmt.Println("Ran out of stderr input, read thread bailing.")
close(stdErrChan)
}(&wg)
wg.Add(1)
index := 1
keepGoing := true
for keepGoing {
select {
case res, isOpen := <-stdOutChan:
if !isOpen {
fmt.Println("stdOutChan is no longer open, main bailing.")
keepGoing = false
} else {
fmt.Println(index, "s:", res)
index = 1
}
case res, isOpen := <-stdErrChan:
if !isOpen {
fmt.Println("stdErrChan is no longer open, main bailing.")
keepGoing = false
} else {
fmt.Println(index, "error s:", res)
index = 1
}
}
}
wg.Wait()
fmt.Println("Done!")
}
Output:
> go run watcher.go
1 s: This is line 1
2 s: This is line 2
3 error s: This is error 3
4 s: This is line 3
5 s: This is line 4
6 s: This is line 5
7 s: This is line 6
8 error s: This is error 6
9 s: This is line 7
10 s: This is line 8
11 s: This is line 9
12 error s: This is error 9
13 s: This is line 10
Ran out of stdout input, read thread bailing.
stdOutChan is no longer open, main bailing.
Ran out of stderr input, read thread bailing.
Done!
Could stand with some refactoring, obviously, but it works, which is the goal.
Thanks!
CodePudding user response:
Simplify the code by creating a pipe and reading from that pipe:
cmd := exec.Command("./slowroll")
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(stdout)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
If your goal is to monitor the combined output of stderr and stdin, then use the same pipe for both:
cmd := exec.Command("./slowroll")
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // <-- use stdout pipe for stderr
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(combined)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
The code in the question has a data race on the stdOut bytes.Buffer.