Suppose we have a pyspark.sql.dataframe.DataFrame
object:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
I have a function that runs sql
query for each rows of the DataFrame:
def getInfo(data):
param_name = data['name']
param_gender = data['gender']
param_age = data['age']
sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
info = info.append(spark.sql(sql_query))
return info
I am trying to run function each rows by map
:
df_info = df.rdd.map(lambda x: getInfo(x))
I got errors
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
CodePudding user response:
The error message is actually telling you what exactly what is wrong. Your function is trying to access SparkContext(sparck.sql(sql_query)
) from inside a transformation( df.rdd.map(lambda x: getInfo(x))
).
Here's what I think you are trying to do:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")
people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")
Here's a couple other ways to do a join.