I would like to know and understand how to perform a filtering concurrent pipeline with go in a producer/consumer scheme.
I've managed to write a version that checks on a value and if it is ok, sends it to one channel and if not, the value is sent to another channel.
After are values are read and processed, two goroutines are in charge of reading the processed values and write them to a file. This version works ok. But...
Suppose that I don't want the invalid values. Is there a way to change the select statement (or the consumer goroutine) so that only the correct values are outputted (ie using only one output channel). I tried removing that invalidValues channel but I did not succeed.
I tried putting the select statement in the
if valid?
; with one branch with the complete statement as in this version and in the false branch with just the waiting for the done channel. In this way I could discard invalid values and use one channel but I did not succeed as well with this approach.
Any ideas on how to fix this?
- Moreover, in this scheme I would like to know why if I omit the goroutine that removes the values from the invalidValues channel the program does not finish? Is it that the channel needs to be emptied otherwise remains blocked? Is there a more elegant way to do that that to do a range over the values?
Thanks!!
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
Here is the complete version of the code
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
CodePudding user response:
To remove the invalid channel:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}
If you remove the invalid value reader goroutine, you have to change the waitgroup to:
wg2.Add(1)
so you don't end up waiting indefinitely.