Home > database >  Apache Beam ParDo Filter in Go
Apache Beam ParDo Filter in Go

Time:02-01

I'am a Python developer but supposed to make a Dataflow pipeline using Go. I couldn't find as many examples for Apache Beam using Go as compared to Python or Java.

I have the below code which has a structure of user name and age. The task is to increment the age and then filter on Age. I found the way to increment the age but stuck on the filtering part.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(incrementAge)
}

type user struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list user) {
    fmt.Println(list)
}

func incrementAge(list user) user {
    list.Age  
    return list
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []user{
        {"Bob", 40},
        {"Adam", 50},
        {"John", 35},
        {"Ben", 8},
    }
    initial := beam.CreateList(s, userList)

    pc := beam.ParDo(s, incrementAge, initial)

    pc1 := beam.ParDo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.ParDo0(s, printRow, pc1)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

I tried creating a function like below but this returns a bool and not a user object. I know I'am missing out on something simple but unable to figure out.

func filterAge(list user) user {
    return list.Age > 40    
}

In Python I could write function like below.

beam.Filter(lambda line: line["Age"] >= 40))

CodePudding user response:

You need to add an emitter in the function to emit user:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

As written in your current code, return list.Age > 40 list.Age > 40 evaluates to true first (boolean) and this boolean is getting returned.

  • Related