Home > Mobile >  PySpark: PicklingError on calling df.foreach method
PySpark: PicklingError on calling df.foreach method

Time:05-28

I have a code (kafka_producer.py) that reads from csv >> creates Pandas dataframe >> converts pandas dataframe to spark dataframe >> call foreach method on spark-dataframe to post message to kafka. The df.foreachPartition(self.send_to_kafka) is throwing PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects.
Code is below:

def get_kafka_producer(topic):
    kafkaBrokers='kafka.broker:9093'
    caRootLocation='/path/to/CARoot.pem'
    certLocation='/path/to/certificate.pem'
    keyLocation='/path/to/key.pem'
    password='abc123'
    
    producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                              security_protocol='SSL',
                              ssl_check_hostname=False,
                              ssl_cafile=caRootLocation,
                              ssl_certfile=certLocation,
                              ssl_keyfile=keyLocation,
                              ssl_password=password)
    return producer


class SparkKafkaWriter:
    topic = None
    producer = None
    def __init__(self,topic):
        self.topic = topic
    
    def send_to_kafka(self,rows):
        print("Writing Data")
        for row in rows:
            json_str = json.dumps(row)
            self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
            self.producer.flush()
    
    def post_spark_to_kafka(self,df):
        producer = get_kafka_producer()
        self.producer = producer
        df.foreachPartition(self.send_to_kafka)
        print("Dataframe Posted")

    
def run_kafka_producer(path,sep,topic):
    df = pd.read_csv(path,sep=sep)
    if isinstance(df, pd.DataFrame):
        print("Converting Pandas DF to Spark DF")
        spark = get_spark_session("session_name")
        df = spark.createDataFrame(df)
    
    writer = SparkKafkaWriter(topic)
    writer.post_spark_to_kafka(df)


if __name__ == __main__:
    path = "/path/to/data.csv"
    sep = "|"
    topic = "TEST_TOPIC"
    run_kafka_producer(path,sep,topic)

Error is:

File "/path/to/kafka_producer.py", line 45, in post_spark_to_kafka
    df.foreachPartition(self.send_to_kafka)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/sql/dataframe.py", line 596, in foreachPartition
    self.rdd.foreachPartition(f)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 806, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2532, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2434, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/cloudera/parcels/IMMUTA/python/pyspark/serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

CodePudding user response:

I think that you don't understand what's happening here.

You are creating a kafaka connection in your driver, and then trying to ship that live connection from the driver, across your network to executor to do the work.(your function in foreachPartitions is executed on the executor.)

That is what spark is telling you "cannot pickle _thread.RLock objects". (It can't serialize your live connection to the kafka to ship it to an executor.)

You need to call get_kafka_producer() from inside the code block foreachPartition this will initialize the connection to the database from inside the executor.(And any other book keeping you need to do.)

FYI: The worst part that I want to call out is that this code will work on your local machine. This is because it's both the executor and the driver. Also this will open a connection to kafka for each executor at more or less the same time. (5 executors = 5 open connections). It will open a connection for each partition(default is 200) so you want to make sure you close them when you are done.

def send_to_kafka(self,rows):
    print("Writing Data")
    producer = get_kafka_producer()
    self.producer = producer
    #do topic configuration
    for row in rows:
        json_str = json.dumps(row)
        self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
        self.producer.flush()
    #Do something to close connection

def post_spark_to_kafka(self,df):
    df.foreachPartition(self.send_to_kafka)
    print("Dataframe Posted")
  • Related