I have a question regarding SPARK operations in python.
So the question I have is formatted nicely and enclosed in the following letter:
https://cernbox.cern.ch/index.php/s/W3zWvparRP2WGJc
Its about how to perform an operation that works perfectly fine on a pandas dataframe, on a SPARK dataframe.
Basically given the function:
def split(arr, size):
arrs = []
while len(arr) > size:
pice = arr[:size]
arrs.append(pice)
arr = arr[size:]
arrs.append(arr)
return arrs
What is the equivelant of this cell in SPARK:
df_list = []
for i in range (0,len(p_df.index)):
ars = split(p_df.iloc[i][0]['elements'], 1024)
final_df = pd.DataFrame(ars)
final_df.insert(0, 'timestamp', p_df.iloc[i][1])
time = p_df.iloc[i][1]
magCurr = m_df.iloc[(m_df['__record_timestamp__']-time).abs().argsort()[:2]].value.mean()
final_df.insert(1, 'magnetcurrent', round(magCurr))
final_df.insert(2, 'cycle', range(0,90))
df_list.append(final_df)
all_profiles = pd.concat(df_list, ignore_index=True)
?
As you may be able to guess the python solution is too slow and memory inefficient for use on ALL my data but I just don't know how to use SPARK well enough to convert this pandas operation to a spark one.
I don't need a solution but point me to some of the functions that basically do the same thing I'm doing here would be amazing. TIA.
CodePudding user response:
This is solution:
SPLIT_COUNT = 90
SPLIT_SIZE = 1024
spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )
slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]
spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')
spark_m = magnetData.select("value", '__record_timestamp__', )
spark_p = spark_p.withColumn('value', F.lit(None))
spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))
final_df = spark_p.unionByName(spark_m)
w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)
final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))