Home > front end >  How to randomly shuffle the values of only one column in pyspark?
How to randomly shuffle the values of only one column in pyspark?

Time:02-08

I want to break the correlation between a column and the rest of the dataframe. I want to do this while maintaining the distribution of the values in the said column

In pandas, I used to achieve this by simply shuffling the values of a column and then assigning the values to the column. It is not so straightforward in the case of pyspark because of the data being partitioned. I do not think there is even a way in pyspark to set a new column in a dataframe with a column from another dataframe

So, In pyspark, how do I achieve the following?:

import pandas as pd
df = pd.DataFrame({'a':[1,2,3,4],'b':[3,4,5,6]})
df['b'] = df['b'].sample(frac=1).reset_index(drop=True)

Also, I'm hoping you give me a solution that does not include unnecessary shuffles.

one way to do it is:

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import row_number,lit,rand
from pyspark.sql.window import Window
df = pd.DataFrame({'a':[1,2,3,4],'b':[3,4,5,6]})
dfs = spark.createDataFrame(df)
w = Window().orderBy(lit('A'))
dfs = dfs.withColumn("row_num", row_number().over(w))
dfs_ts = dfs.select('b')
dfs_ts = dfs_ts.withColumn('o',rand()).orderBy('o')
dfs = dfs.drop('b')
dfs_ts = dfs_ts.drop('o')
w = Window().orderBy(lit('A'))
dfs_ts = dfs_ts.withColumn("row_num", row_number().over(w))
dfs = dfs.join(dfs_ts,on='row_num').drop('row_num')

But, I do not need the shuffles that come with join and they are not necessary. If a blind hstack is possible per partition basis in pyspark that should be enough. Also, the window function tells me that I have not defined any partitions so all my data would be collected to one partition. Might as well use pandas in that case

CodePudding user response:

I think you could achieve something like that on the RDD level. Not sure if it would be worth all the extra steps, and depending on the size of the partitions it might not perform really well because you'd have to hold all the values of the partition in memory for the shuffle.

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

import pandas as pd
df = pd.DataFrame({'a':[x for x in range(10)],'b':[x for x in range(10)]})
dfs = spark.createDataFrame(df)
dfs.show()
rdd_a = dfs.select("a").rdd
rdd_b = dfs.select("b").rdd


from random import shuffle
def f(iterator):
    items = list(iterator)
    shuffle(items)
    for x in items:
        yield x

from pyspark import Row
def reconstruct(x):
    return Row(a=x[0].a, b=x[1].b)

rdd_b_reordered = rdd_b.mapPartitions(f)
df_reordered = spark.createDataFrame(rdd_a.zip(rdd_b_reordered).map(reconstruct))
df_reordered.show()
"""
My output:
 --- --- 
|  a|  b|
 --- --- 
|  0|  2|
|  1|  4|
|  2|  0|
|  3|  1|
|  4|  3|
|  5|  7|
|  6|  9|
|  7|  5|
|  8|  6|
|  9|  8|
 --- --- 
"""

Maybe you can tweak that to your needs. This would only shuffle the things over each partition to avoid the partition shuffle. Also probably better to do it in Scala.

CodePudding user response:

I am not fully confident in this answer, but I think it's right.

We made the Fugue project to port native Python or Pandas code to Spark or Dask. This lets you can keep the logic very readable by expressing it in native Python. Fugue can then port it to Spark for you with one function call.

So if we get your Pandas example:

import pandas as pd
df = pd.DataFrame({'a':[1,2,3,4],'b':[3,4,5,6]})
df['b'] = df['b'].sample(frac=1).reset_index(drop=True)

We can just wrap the .sample expression in a function.

def shuffle(df: pd.DataFrame) -> pd.DataFrame:
    df['b'] = df['b'].sample(frac=1).reset_index(drop=True)
    return df

And then we can bring it to Spark using the transform function. The ``transform` function can take in both Pandas and Spark DataFrames and then will convert it to Spark if you are using the Spark engine.

from fugue import transform
import fugue_spark
transform(df, shuffle, schema="*", engine="spark")

But of course, this transform is applied per partition. If you don't specify the partitions, it uses the default partitions. If you want to shuffle to really randomize it, you can do:

transform(df, shuffle, schema="*", engine="spark", partition={"algo":"rand"}).show()

And Fugue will partition your data randomly before this operation.

You may not see the shuffle for your test case because the data is really small. If you end up with 4 partitions with 1 row each, they will end up returning the same value.

  •  Tags:  
  • Related