I have a single flink job with 3 different inputs (optional) and the same output will be emitted for each type of input.
input1 uses KeyedProcessFunction()
input2 uses ProcessWindowFuction()
Basically, the job input is a union of three inputs and single output. How do we configure the flink job so that for single job, I can use above both the process functions.
I wanted to use KeyedProcessFunction()
for input1
and ProcessWindowFuction()
for input2
.
below code is for input2
only, If I want to use input1
i have to use .process(processFuction())
instead of .process(MyProcessWindowFunction())
in the job config, how do we configure so that I can use both the functions in single job ?
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
CodePudding user response:
A single Flink job can contain several pipelines. E.g.,
env.fromSource(input1)
.keyBy(...)
.process(new MyKeyedProcessFunction())
.sinkTo(sink1)
env.fromSource(input2)
.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction())
.sinkTo(sink2)
env.execute()