I'm totally new to dataframes and pyspark in general.
In python what I want to do is trivial - however I can't seem to find a method which doesn't take ages using pyspark.
I have a pyspark dataframe with approx 4000 rows with a schema as follows:
root
|-- waveformData: struct (nullable = true)
| |-- elements: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- dimensions: array (nullable = true)
| | |-- element: integer (containsNull = true)
Each array is approx 20000 doubles.
Searching this array to find a max value and threshold (first instance of 50% of max value) takes very little time - but only once the data is in a 'normal' format (numpy array).
I am using a basic:
wav_df = temp_data.select("waveformData").toPandas()
wav = wav_df.to_numpy()[0][0].get("elements")
and then searching for max / threshold
But the 'toPandas' step takes forever (like 30 seconds for a single row)
Why?
I've been trying to operate on the pyspark dataframe to avoid this conversion using .collect and the like, but everything I try takes ages.
If pyspark is meant for big data, I must be doing it wrong, there's no way this is the normal processing time for this amount of data.
What am I missing?
CodePudding user response:
I created some random test data
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
import numpy as np
np.random.seed(123)
spark = SparkSession.builder.getOrCreate()
n = 20000
for i in range(100):
df = spark.createDataFrame([
Row(waveFormData=Row(elements=[float(v) for v in np.random.randn(n)], dimensions=[n])) for i in range(40)
])
df.write.parquet('waveFormData.parquet', mode='append')
When I load the data and select the max of the arrays that ran through within 2 seconds:
df = spark.read.parquet('waveFormData.parquet')
df.select(F.array_max('waveFormData.elements')).toPandas()