Home > other >  Using regular expression to consume multiple topic in Flink
Using regular expression to consume multiple topic in Flink

Time:02-01

I know that flink is able to consume multiple topics using regular expression as enter link description here.

I have the following topic name such as

sclee-10343434
sclee-10342432
sclee-34234
sclee-3343423432424
....

In this case, when I set the value as below using the regular expression as sclee-[\\d ], then it gave me the exception.

Is it correct for the case of the regular expression in my case? And also, did the Flink really support it?

val source = new FlinkKafkaConsumer[T](
  java.util.regex.Pattern.compile("sclee-[\\d ]"),
  deserializer,
  consumerProps
)

The error is below.

Caused by: java.lang.RuntimeException: Unable to retrieve any partitions with KafkaTopicsDescriptor: Topic Regex Pattern (dev-plexer-10507689[\d ])
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:153)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:553)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:750)

CodePudding user response:

I think the problem here is that the regex will not match the topics provided. The provided regex sclee-[\\d ] matches sclee- followed by a single digit or sign.

In this case You most probably need: sclee-[\\d] .

CodePudding user response:

I would recommend to use the latest Flink stable version (currently Flink 1.16), since the documentation link you've included points to 1.4 which is no longer supported plus the Kafka Consumer has changed. See https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#topic-partition-subscription for more details

  • Related