Home > Net >  Pyspark with Joblib giving me ambiguous result
Pyspark with Joblib giving me ambiguous result

Time:04-09

I am trying to fetch data from teradata--

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

NOTE: One or more select .. may fail which should not cause whole union to fail.

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        if return_pandasDF:
            return spark_df.toPandas()
        else:
            return spark_df

    def run_queries_and_union_all(self, queries, return_pandasDF=True):
        def run(query):
            try:
                return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
            except Exception as e:
                return None
        
        dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
        concat_df = pd.concat(dfs).reset_index(drop=True)
        if return_pandasDF:
            return concat_df
        else:
            return self._spark.createDataFrame(concat_df)

    def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

As you can see split_query_and_run_individually method splits the query based on union all then runs all of the sub queries in parallel threads n_jobs=10.

But the problem I am facing is that the data is corrupted like this

n_jobs = 1
                               src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
                              src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
                            src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

As you can see as I increase the number of threads the result becomes ambiguous. What is happening is that the results from each query are overlapping with each other.

I have also implemented the same class with teradatasql library which works just fine with n_jobs=-1. I think self._reader.option('dbtable', f"({query}) as tbl").load() is getting messed up in threads. I tried with ThreadpoolExecutor but similar result. Does anyone know how to solve this issue? Versions

Python 3.6.8
Spark 2.4.0-cdh6.3.4

CodePudding user response:

Thanks to @pltc, here is one solution. Although it is very slow compared to teradatasql library with multithreading although FAIR schedulers on

from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        # spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        # if return_pandasDF:
        #     return spark_df.toPandas()
        # else:
        #     return spark_df
        return self.split_query_and_run_individually(query, r'union all', return_pandasDF)

    def run_queries_and_union_all(self, queries, return_pandasDF=True):     
        dataframes = []
        for each_query in queries:
            try:
                spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
                dataframes.append(spark_df)
            except Exception as e:
                # simply ignoring the query
                print(f'Error while reading the query {each_query}')
            
        concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
        if return_pandasDF:
            return concat_sparkDf.toPandas()
        else:
            return concat_sparkDf

    def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)
  • Related