- Figure out what is the correct way to scale up the remote function.
- Figure out scaling relations between replicas of the remote function, Flink
parallelism.default
configuration, ingress topic partition counts together with message partition keys. What is the design intentions behind this topic.
As the docs suggest, one of the benefits of flink statefun remote functions is that the remote function can scale differently with the flink workers and task parallelism. To understand more about how these messages are sent to the remote function processes. I have tried following scenarios.
Preparation
- Use https://github.com/apache/flink-statefun-playground/blob/main/deployments/k8s this for my experiment.
- Modify the https://github.com/apache/flink-statefun-playground/blob/main/deployments/k8s/03-functions/functions.py to the following to check the logs how things are parallelized in practice
...
functions = StatefulFunctions()
@functions.bind(typename="example/hello")
async def hello(context: Context, message: Message):
arg = message.raw_value().decode('utf-8')
hostname = os.getenv('HOSTNAME')
for _ in range(10):
print(f"{datetime.utcnow()} {hostname}: Hello from {context.address.id}: you wrote {arg}!", flush=True)
time.sleep(1)
...
- Play around the
parallelism.default
in the flink.conf, replicas count in the functions deployment configuration as well different partitioning configurations in the ingress topic:names
Observations
- When sending messages with the same partition key, everything seems to be running sequentially. Meaning if I send 5 messages like "key1:message1", "key1:message2", "key1:message3", "key1:message4", ""key1:message5". I can see that only one of the pod is getting requests even I have more replicas (Configured 5 replicas) of the remote function in the deployment. Regardless how I configure the parallelism or increasing the ingress topic partition count, it always stays the same behavior.
- When sending messages with 10 partition keys (The topic is configured with 5 partitions, and parallelism is configured to 5 and the replicas of the remote function is configured to 5). How the replicas remote function receiving the requests seems to be random. Sometime, 5 of them receiving requests at the same time so that 5 of them can run some task together. But some time only 2 of them are utilized and other 3 are just waiting there.
- Seems parallelism determines the number of consumers in the same consumer group that subscribing to the ingress topic. I suspect if I have if configured more parallelism than the number of partitions in the ingress topic. the extra parallelism will just stay idle.
My Expectations
- What I really expect how this SHOULD work is that 5 of the replica remote functions should always be fully utilized if there is still backlogs in the ingress topic.
- When the ingress topic is configured with multiple partitions, each partitions should be batched separately and multiplex with other parallelism (or consumers) batches to utilize all of the processes in the remote functions.
Can some Flink expert help me understand the above behavior and design intentions more?
CodePudding user response:
There are two things happening here...
- Each partition in your topic is assigned to a sub-task. This is done round-robin, so if you have 5 topic partitions and 5 sub-tasks (your parallelism) then every sub-task is reading from a single different topic partition.
- Records being read from the topic are keyed and distributed (what Flink calls partitioning). If you only have one unique key, then every record it sent to the same sub-task, and thus only one sub-task is getting any records. Any time you have low key cardinality relative to the number of sub-tasks you can get skewed distribution of data.
Usually in Statefun you'd scale up processing by having more parallel functions, versus scaling up the number of task managers that are running.