I'm building a filtering pipeline with channels by function composition instead of 120 LOC all in a single method since I might reuse some part of this pipeline later.
I'm not been able to make it work as intended. I suspect that the funcion readValuesFromFile
is exiting before the scanner.Scan()
puts a value in the inputStream
channel (ie
that method's main goroutine is exiting before (1) goroutine).
If I replace the scanner.Scan()
with just putting some random string in the channel
the whole pipeline works as expected.
Is this the problem or I'm missing something?
How can this be fixed in an elegant way?
Thanks!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream validate&filter creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
CodePudding user response:
The function readValuesFromFile
is guaranteed to return before the first value is sent to inputStream
. Communication on the unbuffered channel inputStream
does not succeed until a sender and receiver are ready. There is no receive on inputStream
until after readValuesFromFile
returns, therefore the send from the goroutine will not succeed until after readValuesFromFile
returns.
When the function readValuesFromFile
returns, the defer statement closes the file used by the scanner. It's possible that the scanner buffers some data before the file is closed underneath it, but it's also possible that the scanner does not read any data.
Fix by closing the file from the goroutine.
The error returned from the scanner describes problem. Always handle errors.
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}