Home > database >  pySpark --> NoSuchTableException: Table or view 'my_test_table_thread_1' not found in d
pySpark --> NoSuchTableException: Table or view 'my_test_table_thread_1' not found in d

Time:06-14

I run a spark job and it works without any error. In my pyspark code , I run 3 machine learning job sequently. But when I try them work in a thread concurently i got an error. It gives error on this part:

def run(.....):
(
......



   sc = SparkContext.getOrCreate(conf=conf)
   sc.setCheckpointDir("/tmp/ersing/")
   spark = SparkSession(sc)


    temp_name = "my_test_table_thread_" str(thread_id)
    my_table.createOrReplaceTempView(temp_name)

    print(temp_name  " count(*) --> "   str(my_table.count()))
    print("""spark.catalog.tableExists(""" temp_name """) = """   str(spark._jsparkSession.catalog().tableExists(temp_name))) 

    model_sql = """select id from {sample_table_name} where 
                   id= {id} """.format(id=id, sample_table_name=temp_name)
    my_df= spark.sql(model_sql).select("id",)  #this part gives error --> no such table
    my_df= broadcast(my_df)
......
)

my main code is :

....

from multiprocessing.pool import ThreadPool
import threading
    
def run_worker(job): 
     returned_sample_table= run('sampling',...)  # i call run method twice. First run get df and I call second run for modeling
     run('modeling',...,returned_sample_table)

def mp_handler():
    p = ThreadPool(8)
    p.map(run_worker, jobs)
    p.join()
    p.close()

mp_handler()

I run 3 jobs concurently and every time just one job createOrReplaceTempView works fine because i logged this : print("""spark.catalog.tableExists(""" temp_name """) = """ str(spark._jsparkSession.catalog().tableExists(temp_name))) and I saw one of jobs is exists and others not.

So what i am missing?

Thanks in advance.

CodePudding user response:

Finally i got the solution.

the problem is spark context. When one of threads works done and it closes the context , others dont find the tables on spark.

What i did is I moved the spark context to the main like this :

def run_worker(job): 

     sc = SparkContext.getOrCreate(conf=conf)
     sc.setCheckpointDir("/tmp/ersing/")
     spark = SparkSession(sc)

     returned_sample_table= run(spark ,'sampling',...)  # i call run method twice. First run get df and I call second run for modeling
     run(spark ,'modeling',...,returned_sample_table)
  • Related