Home > Enterprise >  Flink Python Datastream API Kafka Consumer - NoClassDefFoundError ByteArrayDeserializer Error
Flink Python Datastream API Kafka Consumer - NoClassDefFoundError ByteArrayDeserializer Error

Time:07-02

I have an error on Py4j side of the PyFlink. Code is below:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/"   os.getcwd()   "/jar_files/"   "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
                                [Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
                                 Types.STRING(), Types.FLOAT(),
                                 Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
kafka_producer = FlinkKafkaProducer(
        topic='testmodels',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092'}
    )

The error is:

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..

I also have a dummy map function, which is not relevant here because the error is coming from

def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
    if not isinstance(topics, list):
        topics = [topics]
    gateway = get_gateway()
    j_properties = gateway.jvm.java.util.Properties()
    for key, value in properties.items():
        j_properties.setProperty(key, value)

    j_flink_kafka_consumer = j_consumer_clz(topics,
                                            deserialization_schema._j_deserialization_schema,
                                            j_properties)
    return j_flink_kafka_consumer

The thing I tried are:

  • Adding env.add_jars("file:/" os.getcwd() "/jar_files/" "kafka-clients-3.2.0.jar") to code.
  • Adding another java_import as java_import(gateway.jvm, "org.apache.kafka.common.serialization.*") in `import_flink_view' method.
  • Use flink-connector-kafka-1.15.0.jar and kafka-clients-3.2.0.jar together.

They did not solve the issue.

CodePudding user response:

Pyflink uses .jar files under ~/miniconda3/envs/<env_name>/lib/<pythonversion>/site_packages/pyflink/lib/ while linking to jvm. Moving the flink-connector-kafka-1.15.0.jar and kafka-clients-3.2.0.jar under this folder may solve your issue.

  • Related