I have a py-spark code running in Azure databricks. I have a spark dataframe with 20 numerical columns, named column1, column2, ...column20.
I have to calculate the Zscore(from scipy.stats import zscore
) of these 20 columns, for that I am reading these 20 columns as numpy array.
But this collect is causing the spark cluster to restart, I understand collect is trying to bring the entire data set into a driver, do we have an alternative approach for solving this problem?
I can increase the driver node memory or I can think of using memory optimized VM for the driver, but do we have an alternative without a higher infra?
Below is the code snippet to create the sample dataframe.
import databricks.koalas as ks
import pandas as pd
import random
import numpy as np
from scipy.stats import zscore
df = ks.DataFrame.from_dict({
'Column1': [random.randint(0, 100000) for i in range(15000000)],
'Column2': [random.randint(0, 100000) for i in range(15000000)],
'Column3': [random.randint(0, 100000) for i in range(15000000)],
'Column4': [random.randint(0, 100000) for i in range(15000000)],
'Column5': [random.randint(0, 100000) for i in range(15000000)],
'Column6': [random.randint(0, 100000) for i in range(15000000)],
'Column7': [random.randint(0, 100000) for i in range(15000000)],
'Column8': [random.randint(0, 100000) for i in range(15000000)],
'Column9': [random.randint(0, 100000) for i in range(15000000)],
'Column10': [random.randint(0, 100000) for i in range(15000000)],
'Column11': [random.randint(0, 100000) for i in range(15000000)],
'Column12': [random.randint(0, 100000) for i in range(15000000)],
'Column13': [random.randint(0, 100000) for i in range(15000000)],
'Column14': [random.randint(0, 100000) for i in range(15000000)],
'Column15': [random.randint(0, 100000) for i in range(15000000)],
'Column16': [random.randint(0, 100000) for i in range(15000000)],
'Column17': [random.randint(0, 100000) for i in range(15000000)],
'Column18': [random.randint(0, 100000) for i in range(15000000)],
'Column19': [random.randint(0, 100000) for i in range(15000000)],
'Column20': [random.randint(0, 100000) for i in range(15000000)]
})
df_spark=df.to_spark()
stats_array = np.array(df_spark.select('Column1', 'Column2', 'Column3', 'Column4', 'Column5', 'Column6', 'Column7', 'Column8', 'Column9', 'Column10', 'Column11', 'Column12', 'Column13', 'Column14', 'Column15', 'Column16','Column17','Column18','Column19','Column20').collect()) #causing out of memory error
normalized_data = zscore(stats_array, axis=0)
normalized_data_remnan = np.nan_to_num(normalized_data)
normalized_df = pd.DataFrame(data=normalized_data_remnan, columns=['Column1', 'Column2', 'Column3', 'Column4', 'Column5', 'Column6', 'Column7', 'Column8', 'Column9', 'Column10', 'Column11', 'Column12', 'Column13', 'Column14', 'Column15', 'Column16','Column17','Column18','Column19','Column20'])
normalized_df['sq_dist'] = [np.linalg.norm(i) for i in normalized_data_remnan]
Is there a better way of doing this without getting all the columns as a numpy array in driver? I would appreciate your suggestions on this.
CodePudding user response:
To keep all your results in Spark and avoid the collect
step, you should use a for-loop and aggregate functions over the entire dataframe:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy()
for c in df_spark.columns:
df_spark = df_spark.withColumn(c, (F.col(c) - F.mean(c).over(w)) / F.stddev(c).over(w))
In this case I supposed that all your columns needed to be standardized, otherwise specify which columns you need instead of writing df_spark.columns
.
The empty window Window.partitionBy()
is needed by pyspark to compute mean and standard deviation on the entire column.
CodePudding user response:
You can save your data as parquet or csv on the hard disk and restart your kernel then load the data outside of pyspark environment. the main problem with .collect is data will be loaded in pyspark and your python at the same time and it will get a huge memory.