- I have a RichParallelSourceFunction(parallelism=1) that queries MySQL every 5 second that emits a list of timestamps which indicates when to start/stop writing to the sink.
- This timestamp is broadcasted to the original stream(parallelism=10).
- I have set the parallelism for the RichParallelSourceFunction to 1 to reduce the number of simultaneous request to MySQL server.
- I am confused if broadcast state is needed in this case. Why not just store broadcasted data in operator-local data structures?
- What is the difference between .broadcast(stateDesc) vs .broadcast() ?
class MyBroadcastProcessFunction(name: String) extends BroadcastProcessFunction[Log, TimestampList, Log] with CheckpointedFunction {
private var sortedTimestamps: IndexedSeq[Long] = _
@transient var buffer: ListState[(Log, Long)] = _
def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
if (timestamp > sortedTimestamps(0))
out.collect(value)
else
buffer.put(value, 0L)
}
override def processBroadcastElement(timestamps: TimestampList,
ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context,
out: Collector[Log]): Unit = {
// add timestamps to SortedTimestamps
// do I need to use BroadcastState mapstate here? or just use operator-local data structures (ex. sortedTimestamps)?
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {}
override def initializeState(context: FunctionInitializationContext): Unit = {
val stateDesc = new ListStateDescriptor[(Log, Long)]("logBuffer",
classOf[(Log, Long)])
buffer = context.getOperatorStateStore.getListState(stateDesc)
}
}
CodePudding user response:
The transformation .broadcast()
will emit all events to all downstream operators regardless the parallelism of the stream. Doc says:
Sets the partitioning of the DataStream so that the output elements are broadcasted to every parallel instance of the next operation. Returns:
The .broadcast(stateDesc)
is to define a pattern state where you can find a pattern on one stream of events based on another stream usually very small. This also is a good reference.
The way that you created your BroadcastProcessFunction
is wrong because you are processing only one stream. The correct way to process the broadcast state , in your case the time stamp from MySql, at the processBroadcastElement()
method. In this method you have to update the global/bradcasted state.
Then the other method processElement()
you receive a regular or fast stream where you can find a pattern based on the state that you updated on the first method processBroadcastElement()
.
Here is moreless how you should implement. There are some considerations like you will not be able to update a ListState
. It is better to use a MapState
as it is described on the links.
def processBroadcastElement(timestamps: TimestampList,
ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context, out: Collector[Log]): Unit = {
// update buffer state
// I don't think you can use .put() to update the ListState.
// Actually I think it is not possible to update a ListState, than you have to use MapState.
context.getOperatorStateStore.getListState(stateDesc)
.put(value.name, value);
}
def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
buffer = context.getOperatorStateStore.getListState(stateDesc)
if (value: Log match within buffer ?)
out.collect(value) // MATCH
}
CodePudding user response:
.broadcast(stateDesc)
needs a state descriptor so that it knows how to serialize the data being broadcast. You can use this regardless of whether or not you want to store the data being broadcast in MapState in your KeyedBroadcastProcessFunction.
If you don't use MapState to store this data, then it will be lost if the job fails and restarts. But perhaps this doesn't matter in your case, since you can get the latest data from MySQL when the job restarts.