Home > Software design >  How to setup two different process functions for single flink job config?
How to setup two different process functions for single flink job config?

Time:02-26

I have a single flink job with 3 different inputs (optional) and the same output will be emitted for each type of input.

input1 uses KeyedProcessFunction() input2 uses ProcessWindowFuction()

Basically, the job input is a union of three inputs and single output. How do we configure the flink job so that for single job, I can use above both the process functions.

I wanted to use KeyedProcessFunction() for input1 and ProcessWindowFuction() for input2.

below code is for input2 only, If I want to use input1 i have to use .process(processFuction()) instead of .process(MyProcessWindowFunction()) in the job config, how do we configure so that I can use both the functions in single job ?

    fun setupJob(env: StreamExecutionEnvironment) {

        val testStream = env.sampleStream()
                   .keyBy { it.f0 }
                   .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                   .process(MyProcessWindowFunction())
    
            testStream.map { it.toKafkaMessage() }
                    .kafkaSink<SampleOutput>() }
}

CodePudding user response:

A single Flink job can contain several pipelines. E.g.,

env.fromSource(input1)
  .keyBy(...)
  .process(new MyKeyedProcessFunction())
  .sinkTo(sink1)

env.fromSource(input2)
  .keyBy(...)
  .window(...)
  .process(new MyProcessWindowFunction())
  .sinkTo(sink2)

env.execute()
  • Related