Home > Software design >  Converting a pandas operation into a SPARK operation
Converting a pandas operation into a SPARK operation

Time:05-12

I have a question regarding SPARK operations in python.

So the question I have is formatted nicely and enclosed in the following letter:

https://cernbox.cern.ch/index.php/s/W3zWvparRP2WGJc

Its about how to perform an operation that works perfectly fine on a pandas dataframe, on a SPARK dataframe.

Basically given the function:

def split(arr, size):
 arrs = []
 while len(arr) > size:
     pice = arr[:size]
     arrs.append(pice)
     arr   = arr[size:]
 arrs.append(arr)
 return arrs

What is the equivelant of this cell in SPARK:

df_list = []

for i in range (0,len(p_df.index)):
    ars = split(p_df.iloc[i][0]['elements'], 1024)
    final_df = pd.DataFrame(ars)
    final_df.insert(0, 'timestamp', p_df.iloc[i][1])
    time = p_df.iloc[i][1]
    magCurr = m_df.iloc[(m_df['__record_timestamp__']-time).abs().argsort()[:2]].value.mean()
    final_df.insert(1, 'magnetcurrent', round(magCurr))
    final_df.insert(2, 'cycle', range(0,90))
    df_list.append(final_df)


all_profiles = pd.concat(df_list, ignore_index=True)

?

As you may be able to guess the python solution is too slow and memory inefficient for use on ALL my data but I just don't know how to use SPARK well enough to convert this pandas operation to a spark one.

I don't need a solution but point me to some of the functions that basically do the same thing I'm doing here would be amazing. TIA.

CodePudding user response:

This is solution:

SPLIT_COUNT = 90
SPLIT_SIZE = 1024

spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )

slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE   1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]

spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')

spark_m = magnetData.select("value", '__record_timestamp__', )


spark_p = spark_p.withColumn('value', F.lit(None))


spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))


final_df = spark_p.unionByName(spark_m)

w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)

final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))
  • Related