Home > database >  RuntimeError: SparkContext should only be created and accessed on the driver
RuntimeError: SparkContext should only be created and accessed on the driver

Time:02-01

I am trying to execute the below code since I need to lookup the table and create a new column out of it. So, I am trying to go with udf as joining didn't work out.

In that, I am getting the RuntimeError: SparkContext should only be created and accessed on the driver. error.

To avoid this error I have included the config('spark.executor.allowSparkContext', 'true') inside the udf function.

But this time I am getting the pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5; error due to the temp table does not spread across the executors.

How to overcome this error or is there any other better approach.

Below is the code.

df_subsbill_label = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data.csv")\                    
    
 df_service_def = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data2.csv")\  
    
    df_service_def.createGlobalTempView("ser_definition")
    
    
    query = '''
    SELECT mnthlyfass
    FROM ser_definition
    WHERE uid = {0}
    AND u_soc = '{1}'
    AND ser_type = 'SOC'
    AND t_type = '{2}'
    AND c_type = '{3}'
    ORDER BY d_fass DESC, mnthlyfass DESC
    LIMIT 1
    '''
    
    
    def lookup_fas(uid, u_soc, t_type, c_type, query):
        spark = SparkSession.builder.config('spark.executor.allowSparkContext', 'true').getOrCreate()
        query = query.format(uid, u_soc, t_type, c_type,)
        df = spark.sql(query)
        return df.rdd.flatMap(lambda x : x).collect()
    
    udf_lookup = F.udf(lookup_fas, F.StringType())
    
    df_subsbill_label = df_subsbill_label.withColumn("mnthlyfass", udf_lookup(F.col("uid"), F.col("u_soc"), F.col("t_type"), F.col("c_type"), F.lit(query)))
    df_subsbill_label.show(20, False)

Error:

    pyspark.sql.utils.AnalysisException: Table or view not found: ser_definition; line 3 pos 5;
'GlobalLimit 1
 - 'LocalLimit 1
    - 'Sort ['d_fass DESC NULLS LAST, 'mnthlyfass DESC NULLS LAST], true

CodePudding user response:

Please add "global_temp", the database name followed by the table name in the SQL.

FROM global_temp.ser_definition

This should work.

CodePudding user response:

First you shoud not get spark session on to executor if you are running spark in cluster mode as spark session object cannot be serialised thus cannot send it to executor. Also, it is against spark design principles to do so.

What you can do here is to broadcast your dataframe instead, this will create a copy of your dataframe inside each executor, then you can get the dataframe in the executor:

df_service_def = spark.read.format("csv").option("inferSchema", True).option("header", True).option("multiLine", True)\
                        .load("file:///C://Users//test_data2.csv")
broadcastVar = spark.broadcast(Array(0, 1, 2, 3))
broadcasted_df_service_def = spark.sparkContext.broadcast(df_service_def)

then inside your udf:

def lookup_fas(uid, u_soc, t_type, c_type, query):
    df = broadcasted_df_service_def.value
    # here apply your query on the dataframe ...

PS: Even though this should work I think it my impact the performance since an udf is called for each row, so maybe you should change the design of your solution.

  • Related