Home > Net >  Apache Beam Select Top N rows from PCollection in Go
Apache Beam Select Top N rows from PCollection in Go

Time:02-02

I'am having a PCollection from which I need to choose n largest rows. I'am trying to create a Dataflow pipeline using Go and stuck at this.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, initial)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

From the above code I need to choose top 5 rows based on User.Age I found the link top package which has a function does the same but it says it returns a single element PCollection. How is it different?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

I added the function to select the top 5 rows like above, but I get an error []main.User is not assignable to main.User

I need the PCollection in the same format as before since I have further processing to do. I suspect this is because the top.Largest function is returning a single-element PCollection. Any ideas on how I can convert the format?

CodePudding user response:

best PCollection is []User

so try...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
  • Related