Home > Blockchain >  Pyspark Dataframe Lambda Map Function of SQL Query
Pyspark Dataframe Lambda Map Function of SQL Query

Time:05-27

Suppose we have a pyspark.sql.dataframe.DataFrame object:

df = sc.parallelize([['John', 'male', 26], 
                 ['Teresa', 'female', 25], 
                 ['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])

I have a function that runs sql query for each rows of the DataFrame:

def getInfo(data):
    param_name = data['name']
    param_gender = data['gender']
    param_age = data['age']
    
    sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
    info = info.append(spark.sql(sql_query))
    
    return info

I am trying to run function each rows by map:

df_info = df.rdd.map(lambda x: getInfo(x))

I got errors

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

CodePudding user response:

The error message is actually telling you what exactly what is wrong. Your function is trying to access SparkContext(sparck.sql(sql_query)) from inside a transformation( df.rdd.map(lambda x: getInfo(x))).

Here's what I think you are trying to do:

df = sc.parallelize([['John', 'male', 26], 
                 ['Teresa', 'female', 25], 
                 ['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")

people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")

Here's a couple other ways to do a join.

  • Related