I have an error on Py4j side of the PyFlink. Code is below:
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/" os.getcwd() "/jar_files/" "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
[Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
Types.STRING(), Types.FLOAT(),
Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='testmodels',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092'}
)
The error is:
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..
I also have a dummy map function, which is not relevant here because the error is coming from
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
if not isinstance(topics, list):
topics = [topics]
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_flink_kafka_consumer = j_consumer_clz(topics,
deserialization_schema._j_deserialization_schema,
j_properties)
return j_flink_kafka_consumer
The thing I tried are:
- Adding
env.add_jars("file:/" os.getcwd() "/jar_files/" "kafka-clients-3.2.0.jar")
to code. - Adding another java_import as
java_import(gateway.jvm, "org.apache.kafka.common.serialization.*")
in `import_flink_view' method. - Use
flink-connector-kafka-1.15.0.jar
andkafka-clients-3.2.0.jar
together.
They did not solve the issue.
CodePudding user response:
Pyflink uses .jar files under ~/miniconda3/envs/<env_name>/lib/<pythonversion>/site_packages/pyflink/lib/
while linking to jvm. Moving the flink-connector-kafka-1.15.0.jar
and kafka-clients-3.2.0.jar
under this folder may solve your issue.