Home > database >  Spark: writing to kafka gives error: org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_S
Spark: writing to kafka gives error: org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_S

Time:12-16

I have a pyspark streaming code that reads from socket, and writes to Kafka topic.
When I am writing to console its able to print it out, but when i am writing to Kafka topic, its giving me org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.
The log is:

2022-12-13 09:18:22,631 - my_module_lib_io.streaming.kafka.spark.producer - INFO - Open Socket on localhost 9999 by using command on another shell: nc -l localhost 9999
22/12/13 09:18:22 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
22/12/13 09:18:25 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
[Stage 1:>                                                          (0   3) / 3]
22/12/13 09:19:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, server.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.

I am running this Apache Kafka (kafka_2.11-2.0.0) as localhost:9092, with no SSL enabled. server.properties looks:

broker.id=1
group.initial.rebalance.delay.ms=0
listeners=PLAINTEXT://:9092
log.dirs=/home/aiman/kafka_local/kafka_2.11-2.0.0/kafka-logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
max.poll.interval.ms=5
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
session.timeout.ms=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=1
transaction.state.log.replication.factor=1
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

The topic (RANDOM_STRING) is existing:

./kafka-topics.sh --zookeeper localhost:2181 --list
TEST_TOPIC
RANDOM_STRING
__consumer_offsets

My Code is:

kafka_config = {
    "kafka.bootstrap.servers": "localhost:9092",
    "checkpointLocation": "/user/aiman/checkpoint/kafka_local/random_string",
    "topic": "RANDOM_STRING"
}
data = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# This Print the dataframe in console:
# data.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .option("truncate", False) \
#     .start() \
#     .awaitTermination()
data.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .options(**kafka_config) \
    .start() \
    .awaitTermination()

I am using the following spark-sql connector: spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar
Spark version 2.4.0-cdh6.3.4.
And Apache Kafka version is kafka_2.11-2.0.0.

UPDATE:
changed the apache kafka version to kafka_2.11-0.10.0.0.

Is it any configuration I am missing ?

CodePudding user response:

Cannot reproduce. Is RANDOM really random? If so, I suspect that is the problem. Otherwise, Spark is distributed, but you're referring to localhost for Kafka, so do you have more than one Kafka cluster, one of which doesn't have the topic you're using?

Using latest Spark (but that probably isn't your issue, although, I'd advise not using a CDH specific version), and Kafka 3.3 (again, version shouldn't really matter)

Create and check topics

$ kafka-topics --list --bootstrap-server localhost:9092 | grep RANDOM_STRING
RANDOM_STRING

Start a server

$ nc -lk 9999

Run the code, using a package for my Spark version. Note: master is local, rather than yarn in CDH

spark-submit --master=local --packages 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.1' example.py

(type something in the nc teriminal)

Then consume it.

kcat -C -b localhost:9092 -t 'RANDOM_STRING' -o earliest
hello world
% Reached end of topic RANDOM_STRING [2] at offset 1

Note: Python has native libraries for TCP socket servers and Kafka producers, so if your code is just forwarding requests from one to the other, Spark is unnecessary.

  • Related