I am trying to fetch data from teradata--
select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3
NOTE: One or more select ..
may fail which should not cause whole union to fail.
from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None
dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
As you can see split_query_and_run_individually
method splits the query based on union all
then runs all of the sub queries in parallel threads n_jobs=10
.
But the problem I am facing is that the data is corrupted like this
n_jobs = 1
src_tbl total_count data_date
0 dsl_dim_mdm_.................... 61 2022-03-17
1 dsl_agg_call.................... 3992202 2022-03-27
2 dsl_call_ac.................... 924719 2022-03-27
3 dsl_dim_acc.................... 4762 2022-03-31
4 .................... 6821 2022-03-31
5 dsl_dim_geo_.................... 8610038 2022-04-05
6 dsl.................... 67116 2022-03-31
7 dsl_rl.................... 2087669 2022-04-06
8 dsl_.................... 154 2022-04-01
9 dsl_.................... 85630 2022-03-27
10 dsl_selling_da.................... 53 2021-03-03
11 dsl_speaker_ev.................... 17765 2022-03-31
12 dsl_speak.................... 26269 2022-08-24
13 dsl_speaker_e.................... 4202 2022-04-05
14 ds.................... 268 2022-03-31
15 dsl_rltn_r.................... 255794 2022-03-18
16 dsl_rltn_nr.................... 12088 2022-03-18
17 dsl_rapp.................... 81182 2022-01-01
18 dsl_dim_physi.................... 109299 2022-03-31
19 dsl.................... 4265 2022-02-01
20 dsl_fac.................... 117978 2022-04-03
21 dsl_coachi.................... 242 2022-03-31
22 dsl_speaker_e.................... 16653 2022-03-31
23 dsl_dim_cal.................... 17817 2099-12-31
24 dsl_rltn_nrt.................... 3304 2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl total_count data_date
0 dsl_sel................ 85630 2022-03-27
1 dsl_sel................ 85630 2022-03-27
2 dsl_sel................ 85630 2022-03-27
3 dsl_sel................ 85630 2022-03-27
4 dsl_sel................ 85630 2022-03-27
5 dsl_sel................ 85630 2022-03-27
6 dsl_sel................ 85630 2022-03-27
7 dsl_sel................ 85630 2022-03-27
8 dsl_sel................ 85630 2022-03-27
9 dsl_sel................ 85630 2022-03-27
10 dsl_speaker_event................ 17765 2022-03-31
11 dsl_speaker_even................ 4202 2022-04-05
12 dsl_speaker_even................ 4202 2022-04-05
13 dsl_s................ 268 2022-03-31
14 dsl_rapper_................ 81182 2022-01-01
15 dsl_rapper_................ 81182 2022-01-01
16 dsl_rltn_nrtl_................ 12088 2022-03-18
17 dsl_rapper_................ 81182 2022-01-01
18 dsl_dim_physicia................ 109299 2022-03-31
19 dsl_cu................ 4265 2022-02-01
20 dsl_fact_f................ 117978 2022-04-03
21 dsl_coaching_................ 242 2022-03-31
22 dsl_speaker_even................ 16653 2022-03-31
23 dsl_dim_call_c................ 17817 2099-12-31
24 dsl_rltn_nrtl_r................ 3304 2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl total_count data_date
0 dsl_dim_acc.................... 4762 2022-03-31
1 dsl_dim_acc.................... 4762 2022-03-31
2 dsl_dim_acc.................... 4762 2022-03-31
3 dsl_dim_acc.................... 4762 2022-03-31
4 dsl_dim_acc.................... 4762 2022-03-31
5 dsl_dim_acc.................... 4762 2022-03-31
6 dsl_dim_acc.................... 4762 2022-03-31
7 dsl_dim_acc.................... 4762 2022-03-31
8 dsl_dim_acc.................... 4762 2022-03-31
9 dsl_dim_acc.................... 4762 2022-03-31
10 dsl_dim_acc.................... 4762 2022-03-31
11 dsl_dim_acc.................... 4762 2022-03-31
12 dsl_dim_acc.................... 4762 2022-03-31
13 dsl_dim_acc.................... 4762 2022-03-31
14 dsl_dim_acc.................... 4762 2022-03-31
15 dsl_dim_acc.................... 4762 2022-03-31
16 dsl_dim_acc.................... 4762 2022-03-31
17 dsl_dim_acc.................... 4762 2022-03-31
18 dsl_dim_acc.................... 4762 2022-03-31
19 dsl_dim_acc.................... 4762 2022-03-31
20 dsl_dim_acc.................... 4762 2022-03-31
21 dsl_dim_acc.................... 4762 2022-03-31
22 dsl_dim_acc.................... 4762 2022-03-31
23 dsl_dim_acc.................... 4762 2022-03-31
24 dsl_dim_acc.................... 4762 2022-03-31
25 dsl_dim_acc.................... 4762 2022-03-31
-----------
As you can see as I increase the number of threads the result becomes ambiguous. What is happening is that the results from each query are overlapping with each other.
I have also implemented the same class with teradatasql
library which works just fine with n_jobs=-1. I think self._reader.option('dbtable', f"({query}) as tbl").load()
is getting messed up in threads. I tried with ThreadpoolExecutor
but similar result. Does anyone know how to solve this issue?
Versions
Python 3.6.8
Spark 2.4.0-cdh6.3.4
CodePudding user response:
Thanks to @pltc, here is one solution. Although it is very slow compared to teradatasql
library with multithreading although FAIR schedulers on
from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
# if return_pandasDF:
# return spark_df.toPandas()
# else:
# return spark_df
return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
def run_queries_and_union_all(self, queries, return_pandasDF=True):
dataframes = []
for each_query in queries:
try:
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
dataframes.append(spark_df)
except Exception as e:
# simply ignoring the query
print(f'Error while reading the query {each_query}')
concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
if return_pandasDF:
return concat_sparkDf.toPandas()
else:
return concat_sparkDf
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)