Need to reduce the computation for the following python code which contains multiple if else statements. The code runs on a DataBricks so I'm open to Pyspark Solutions as well. Currently this code takes more than 1 hour to run. So any help would be appreciated.
unique_list_code
: List of Unique code from concat_df['C_Code']
column used to filter rows of dataframe containing the code.
concat_df
:Pandas DataFrame with 4 million records
unique_list_code = list(concat_df['C_Code'].unique())
MC_list =[]
SN_list =[]
AN_list = []
Nothing_list =[]
for i in range(0,len(unique_list_code)):
print(unique_list_code[i])
code_filtered_df = concat_df[concat_df['C_Code'] == unique_list_code[i]]
#SN_Filter:
SN_filter = code_filtered_df[(code_filtered_df['D_Type'] == 'SN') & (code_filtered_df['Comm_P'] == 'P-mail')]
if len(SN_filter)>0:
print("Found SN")
SN_list.append(unique_list_code[i])
clean_up(SN_filter)
else:
#AN_Filter
AN_filter = code_filtered_df[(code_filtered_df['D_Type'] == 'AN') & (code_filtered_df['Comm_P'] == 'P-mail')]
if len(AN_filter)>0:
print("Found AN")
AN_list.append(unique_list_code[i])
clean_up(AN_filter)
else:
#MC_Check
MF_filter = code_filtered_df[code_filtered_df['MC_Flag'] =='Y' ]
MF_DNS_filter = MF_filter[~(((MF_filter['D_Type'] == 'AN')| (MF_filter['D_Type'] =='SN')) & (MF_filter['Comm_P'] == 'DNS'))]
if len(MF_DNS_filter)>0:
print("Found MC")
MC_list.append(unique_list_code[i])
clean_up(MF_DNS_filter)
else:
print("Nothing Found")
Nothing_list.append(unique_list_code[i])
```
CodePudding user response:
The reason why is taking so long is that you are working with a Pandas DF
If you want to benefit from distributed computation and increase your performance, you need to work with a Spark dataframe in this case:
Spark_DF = spark.createDataFrame(Pandas_DF)
You will need to rewrite your code in PySpark to work with a Spark DF
CodePudding user response:
As mentioned above you need to rewrite this code in pyspark. Pyspark allow you distribut data through worker nodes on cluster.
from pyspark.sql.functions import col
filter_condition = (((col('D_Type') == 'SN' & col('Comm_P') == 'P-mail')) OR ((col('D_Type') == 'AN' & col('Comm_P') == 'P-mail')) OR (...))
result_df = concat_df.where(filter_condition)