I'am a Python developer but supposed to make a Dataflow pipeline using Go. I couldn't find as many examples for Apache Beam using Go as compared to Python or Java.
I have the below code which has a structure of user name and age. The task is to increment the age and then filter on Age. I found the way to increment the age but stuck on the filtering part.
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(incrementAge)
}
type user struct {
Name string
Age int
}
func printRow(ctx context.Context, list user) {
fmt.Println(list)
}
func incrementAge(list user) user {
list.Age
return list
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []user{
{"Bob", 40},
{"Adam", 50},
{"John", 35},
{"Ben", 8},
}
initial := beam.CreateList(s, userList)
pc := beam.ParDo(s, incrementAge, initial)
pc1 := beam.ParDo(s, func(row user, emit func(user)) {
emit(row)
}, pc)
beam.ParDo0(s, printRow, pc1)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
I tried creating a function like below but this returns a bool and not a user object. I know I'am missing out on something simple but unable to figure out.
func filterAge(list user) user {
return list.Age > 40
}
In Python I could write function like below.
beam.Filter(lambda line: line["Age"] >= 40))
CodePudding user response:
You need to add an emitter in the function to emit user:
func filterAge(list user, emit func(user)) {
if list.Age > 40 {
emit(list)
}
}
As written in your current code, return list.Age > 40
list.Age > 40
evaluates to true first (boolean) and this boolean is getting returned.