I run a spark job and it works without any error. In my pyspark code , I run 3 machine learning job sequently. But when I try them work in a thread concurently i got an error. It gives error on this part:
def run(.....):
(
......
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("/tmp/ersing/")
spark = SparkSession(sc)
temp_name = "my_test_table_thread_" str(thread_id)
my_table.createOrReplaceTempView(temp_name)
print(temp_name " count(*) --> " str(my_table.count()))
print("""spark.catalog.tableExists(""" temp_name """) = """ str(spark._jsparkSession.catalog().tableExists(temp_name)))
model_sql = """select id from {sample_table_name} where
id= {id} """.format(id=id, sample_table_name=temp_name)
my_df= spark.sql(model_sql).select("id",) #this part gives error --> no such table
my_df= broadcast(my_df)
......
)
my main code is :
....
from multiprocessing.pool import ThreadPool
import threading
def run_worker(job):
returned_sample_table= run('sampling',...) # i call run method twice. First run get df and I call second run for modeling
run('modeling',...,returned_sample_table)
def mp_handler():
p = ThreadPool(8)
p.map(run_worker, jobs)
p.join()
p.close()
mp_handler()
I run 3 jobs concurently and every time just one job createOrReplaceTempView works fine because i logged this : print("""spark.catalog.tableExists(""" temp_name """) = """ str(spark._jsparkSession.catalog().tableExists(temp_name)))
and I saw one of jobs is exists and others not.
So what i am missing?
Thanks in advance.
CodePudding user response:
Finally i got the solution.
the problem is spark context. When one of threads works done and it closes the context , others dont find the tables on spark.
What i did is I moved the spark context to the main like this :
def run_worker(job):
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("/tmp/ersing/")
spark = SparkSession(sc)
returned_sample_table= run(spark ,'sampling',...) # i call run method twice. First run get df and I call second run for modeling
run(spark ,'modeling',...,returned_sample_table)