I have a code (kafka_producer.py) that reads from csv >> creates Pandas dataframe >> converts pandas dataframe to spark dataframe >> call foreach method on spark-dataframe to post message to kafka.
The df.foreachPartition(self.send_to_kafka)
is throwing PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
.
Code is below:
def get_kafka_producer(topic):
kafkaBrokers='kafka.broker:9093'
caRootLocation='/path/to/CARoot.pem'
certLocation='/path/to/certificate.pem'
keyLocation='/path/to/key.pem'
password='abc123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
return producer
class SparkKafkaWriter:
topic = None
producer = None
def __init__(self,topic):
self.topic = topic
def send_to_kafka(self,rows):
print("Writing Data")
for row in rows:
json_str = json.dumps(row)
self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
self.producer.flush()
def post_spark_to_kafka(self,df):
producer = get_kafka_producer()
self.producer = producer
df.foreachPartition(self.send_to_kafka)
print("Dataframe Posted")
def run_kafka_producer(path,sep,topic):
df = pd.read_csv(path,sep=sep)
if isinstance(df, pd.DataFrame):
print("Converting Pandas DF to Spark DF")
spark = get_spark_session("session_name")
df = spark.createDataFrame(df)
writer = SparkKafkaWriter(topic)
writer.post_spark_to_kafka(df)
if __name__ == __main__:
path = "/path/to/data.csv"
sep = "|"
topic = "TEST_TOPIC"
run_kafka_producer(path,sep,topic)
Error is:
File "/path/to/kafka_producer.py", line 45, in post_spark_to_kafka
df.foreachPartition(self.send_to_kafka)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/sql/dataframe.py", line 596, in foreachPartition
self.rdd.foreachPartition(f)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 806, in foreachPartition
self.mapPartitions(func).count() # Force evaluation
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2532, in _jrdd
self._jrdd_deserializer, profiler)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2434, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
CodePudding user response:
I think that you don't understand what's happening here.
You are creating a kafaka connection in your driver, and then trying to ship that live connection from the driver, across your network to executor to do the work.(your function in foreachPartitions is executed on the executor.)
That is what spark is telling you "cannot pickle _thread.RLock objects". (It can't serialize your live connection to the kafka to ship it to an executor.)
You need to call get_kafka_producer()
from inside the code block foreachPartition this will initialize the connection to the database from inside the executor.(And any other book keeping you need to do.)
FYI: The worst part that I want to call out is that this code will work on your local machine. This is because it's both the executor and the driver. Also this will open a connection to kafka for each executor at more or less the same time. (5 executors = 5 open connections). It will open a connection for each partition(default is 200) so you want to make sure you close them when you are done.
def send_to_kafka(self,rows):
print("Writing Data")
producer = get_kafka_producer()
self.producer = producer
#do topic configuration
for row in rows:
json_str = json.dumps(row)
self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
self.producer.flush()
#Do something to close connection
def post_spark_to_kafka(self,df):
df.foreachPartition(self.send_to_kafka)
print("Dataframe Posted")