Can anyone help ?
I have an application I am running via exec.CommandContext (so I can cancel it via ctx). it would normally not stop unless it errors out.
I currently have it relaying its output to os.stdOut which is working great. But I also want to get each line via a channel - the idea behind this is I will look for a regular expression on the line and if its true then I will set an internal state of "ERROR" for example.
Although I can't get it to work, I tried NewSscanner. Here is my code.
As I say, it does output to os.StdOut which is great but I would like to receive each line as it happens in my channel I setup.
Any ideas ?
Thanks in advance.
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go func() {
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)
lines := make(chan string)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
select {
case outputx := <-lines:
// I will do somethign with this!
fmt.Println("Hello!!", outputx)
case <-ctx.Done():
log.Println("I am done!, probably cancelled!")
}
}()
}
Also tried using this
go func() {
scanner := bufio.NewScanner(&stdoutBuf)
for scanner.Scan() {
fmt.Println("I am reading a line!")
lines <- scanner.Text()
}
}()
Even with that, the "I am reading a line" never gets out, I also debugged it and it neve enters the "for scanner.."
Also tried scanning on &stderrBuf
, same, nothing enters.
CodePudding user response:
cmd.Start()
does not wait for the command to finish. Also, cmd.Wait()
needs to be called to be informed about the end of the process.
reader, writer := io.Pipe()
cmdCtx, cmdDone := context.WithCancel(context.Background())
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Start()
go func() {
_ = cmd.Wait()
cmdDone()
writer.Close()
}()
<-cmdCtx.Done()
<-scannerStopped
scannerStopped
is added to demonstrate that the scanner goroutine stops now.
reader, writer := io.Pipe()
scannerStopped := make(chan struct{})
go func() {
defer close(scannerStopped)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}()
cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Run()
go func() {
_ = cmd.Wait()
writer.Close()
}()
<-scannerStopped
And handle the lines as it helps.
Note: wrote this in a bit of hurry. Let me know if anything is unclear or not correct.
CodePudding user response:
You have to scan stdoutBuf
instead of os.Stdin
:
scanner := bufio.NewScanner(&stdoutBuf)
CodePudding user response:
The command is terminated when the context canceled. If it's OK to read all output from the command until the command is terminated, then use this code:
func (d *Daemon) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
args := "-x f -a 1"
cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
err = cmd.Start()
if err != nil {
log.Fatal(err)
}
go func() {
defer cmd.Wait()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s := scanner.Text()
fmt.Println(s) // echo to stdout
// Do something with s
}
}()
}
The command is terminated when the context is canceled.
Read on stdout
returns io.EOF when the command is terminated. The goroutine breaks out of the scan loop when stdout
returns an error.
CodePudding user response:
For a correct program using concurrency and goroutines, we should try to show there are no data races, the program can't deadlock, and goroutines don't leak. Let's try to achieve this.
Full code
Playground: https://play.golang.org/p/Xv1hJXYQoZq. I recommend copying and running locally, because the playground doesn't stream output afaik and it has timeouts.
Note that I've changed the test command to % find /usr/local
, a typically long-running command (>3 seconds) with plenty of output lines, since it is better suited for the scenarios we should test.
Walkthrough
Let's look at the Daemon.Start
method. At the start, it is mostly the same. Most noticeably, though, the new code doesn't have a goroutine around a large part of the method. Even without this, the Daemon.Start
method remains non-blocking and will return "immediately".
The first noteworthy fix is these updated lines.
outR, outW := io.Pipe()
cmd.Stdout = io.MultiWriter(outW, os.Stdout)
Instead of constructing a bytes.Buffer variable, we call io.Pipe
. If we didn't make this change and stuck with a bytes.Buffer, then scanner.Scan()
will return false as soon as there is no more data to read. This can happen if the command writes to stdout only occasionally (even a millisecond apart, for this matter). After scanner.Scan()
returns false, the goroutine exits and we miss processing future output.
Instead, by using the read end of io.Pipe
, scanner.Scan()
will wait for input from the pipe's read end until the pipe's write end is closed.
This fixes the race issue between the scanner and the command output.
Next, we construct two closely-related goroutines: the first to consume from <-lines
, and the second to produce into lines<-
.
go func() {
for line := range lines {
fmt.Println("output line from channel:", line)
...
}
}()
go func() {
defer close(lines)
scanner := bufio.NewScanner(outR)
for scanner.Scan() {
lines <- scanner.Text()
}
...
}()
The consumer goroutine will exit when the lines
channel is closed, as the closing of the channel would naturally cause the range loop to terminate; the producer goroutine closes lines
upon exit.
The producer goroutine will exit when scanner.Scan()
returns false, which happens when the write end of the io.Pipe
is closed. This closing happens in upcoming code.
Note from the two paragraphs above that the two goroutines are guaranteed to exit (i.e. will not leak).
Next, we start the command. Standard stuff, it's a non-blocking call, and it returns immediately.
// Start the command.
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
Moving on to the final piece of code in Daemon.Start
. This goroutine waits for the command to exit via cmd.Wait()
. Handling this is important because the command may for reasons other than Context cancellation.
Particularly, we want to close the write end of the io.Pipe
(which, in turn, closes the output lines producer goroutine as mentioned earlier).
go func() {
err := cmd.Wait()
fmt.Println("command exited; error is:", err)
outW.Close()
...
}()
As a side note, by waiting on cmd.Wait()
, we don't have to separately wait on ctx.Done()
. Waiting on cmd.Wait()
handles both exits caused by natural reasons (command successfully finished, command ran into internal error etc.) and exits caused by Context-cancelation.
This goroutine, too, is guaranteed to exit. It will exit when cmd.Wait()
returns. This can happen either because the command exited normally with success; exited with failure due to a command error; or exited with failure due to Context cancelation.
That's it! We should have no data races, no deadlocks, and no leaked goroutines.
The lines elided ("...
") in the snippets above are geared towards the Done()
, CmdErr()
, and Cancel()
methods of the Daemon type. These methods are fairly well-documented in the code, so these elided lines are hopefully self-explanatory.
Besides that, look for the TODO
comments for error handling you may want to do based on your needs!
Test it!
Use this driver program to test the code.
func main() {
var d Daemon
d.Start()
// Enable this code to test Context cancellation:
// time.AfterFunc(100*time.Millisecond, d.Cancel)
<-d.Done()
fmt.Println("d.CmdErr():", d.CmdErr())
}