I'am having a PCollection from which I need to choose n largest rows. I'am trying to create a Dataflow pipeline using Go and stuck at this.
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
type User struct {
Name string
Age int
}
func printRow(ctx context.Context, list User) {
fmt.Println(list)
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []User{
{"Bob", 5},
{"Adam", 8},
{"John", 3},
{"Ben", 1},
{"Jose", 1},
{"Bryan", 1},
{"Kim", 1},
{"Tim", 1},
}
initial := beam.CreateList(s, userList)
pc2 := beam.ParDo(s, func(row User, emit func(User)) {
emit(row)
}, initial)
beam.ParDo0(s, printRow, pc2)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
From the above code I need to choose top 5 rows based on User.Age I found the link top package which has a function does the same but it says it returns a single element PCollection. How is it different?
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(less)
}
type User struct {
Name string
Age int
}
func printRow(ctx context.Context, list User) {
fmt.Println(list)
}
func less(a, b User) bool {
return a.Age < b.Age
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []User{
{"Bob", 5},
{"Adam", 8},
{"John", 3},
{"Ben", 1},
{"Jose", 1},
{"Bryan", 1},
{"Kim", 1},
{"Tim", 1},
}
initial := beam.CreateList(s, userList)
best := top.Largest(s, initial, 5, less)
pc2 := beam.ParDo(s, func(row User, emit func(User)) {
emit(row)
}, best)
beam.ParDo0(s, printRow, pc2)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
I added the function to select the top 5 rows like above, but I get an error []main.User is not assignable to main.User
I need the PCollection in the same format as before since I have further processing to do. I suspect this is because the top.Largest function is returning a single-element PCollection. Any ideas on how I can convert the format?
CodePudding user response:
best PCollection is []User
so try...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
for _, row := range rows {
emit(row)
}
}, best)