Home > Software design >  Split rows in train test based on user id PySpark
Split rows in train test based on user id PySpark

Time:09-08

I have a PySpark dataframe containing multiple rows for each user:

userId action time
1 buy 8 AM
1 buy 9 AM
1 sell 2 PM
1 sell 3 PM
2 sell 10 AM
2 buy 11 AM
2 sell 2 PM
2 sell 3 PM

My goal is to split this dataset into a training and a test set in such a way that, for each userId, N % of the rows are in the training set and 100-N % rows are in the test set. For example, given N=75%, the training set will be

userId action time
1 buy 8 AM
1 buy 9 AM
1 sell 2 PM
2 sell 10 AM
2 buy 11 AM
2 sell 2 PM

and the test set will be

userId action time
1 sell 3 PM
2 sell 3 PM

Any suggestion? Rows are ordered according to column time and I don't think that Spark's RandomSplit may help as I cannot stratify the split on specific columns

CodePudding user response:

We had similar requirement and solved it in following way:

data = [
  (1, "buy"),
  (1, "buy"),
  (1, "sell"),
  (1, "sell"),
  (2, "sell"),
  (2, "buy"),
  (2, "sell"),
  (2, "sell"),
]

df = spark.createDataFrame(data, ["userId", "action"])

Use Window functionality to create serial row numbers. Also compute count of records by each userId. This will be helpful to compute percentage of records to filter.

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
window = Window.partitionBy(df["userId"]).orderBy(df["userId"])
df_count = df.groupBy("userId").count().withColumnRenamed("userId", "userId_grp")
df = df.join(df_count, col("userId") == col("userId_grp"), "left").drop("userId_grp")
df = df.select("userId", "action", "count", row_number().over(window).alias("row_number"))

df.show()
 ------ ------ ----- ---------- 
|userId|action|count|row_number|
 ------ ------ ----- ---------- 
|     1|   buy|    4|         1|
|     1|   buy|    4|         2|
|     1|  sell|    4|         3|
|     1|  sell|    4|         4|
|     2|  sell|    4|         1|
|     2|   buy|    4|         2|
|     2|  sell|    4|         3|
|     2|  sell|    4|         4|
 ------ ------ ----- ---------- 

Filter training records by required percentage:

n = 75
df_train = df.filter(col("row_number") <= col("count") * n / 100)
df_train.show()
 ------ ------ ----- ---------- 
|userId|action|count|row_number|
 ------ ------ ----- ---------- 
|     1|   buy|    4|         1|
|     1|   buy|    4|         2|
|     1|  sell|    4|         3|
|     2|  sell|    4|         1|
|     2|   buy|    4|         2|
|     2|  sell|    4|         3|
 ------ ------ ----- ---------- 

And remaining records go to the test set:

df_test = df.alias("df").join(df_train.alias("tr"), (col("df.userId") == col("tr.userId")) & (col("df.row_number") == col("tr.row_number")), "leftanti")
df_test.show()
 ------ ------ ----- ---------- 
|userId|action|count|row_number|
 ------ ------ ----- ---------- 
|     1|  sell|    4|         4|
|     2|  sell|    4|         4|
 ------ ------ ----- ---------- 

CodePudding user response:

You can use ntile:

ds = ds.withColumn("tile", expr("ntile(4) over (partition by id order by id)"))

The dataset where tile=4 is your test set, and tile<4 is your train set:

val train = ds.filter(col("tile").equalTo(4))
val test = ds.filter(col("tile").lt(4))

test.show()
 --- ------ ---- ---- 
| id|action|time|tile|
 --- ------ ---- ---- 
|  1|  sell|3 PM|   4|
|  2|  sell|3 PM|   4|
 --- ------ ---- ---- 

train.show()
 --- ------ ----- ---- 
| id|action| time|tile|
 --- ------ ----- ---- 
|  1|   buy| 8 AM|   1|
|  1|   buy| 9 AM|   2|
|  1|  sell| 2 PM|   3|
|  2|  sell|10 AM|   1|
|  2|   buy|11 AM|   2|
|  2|  sell| 2 PM|   3|
 --- ------ ----- ---- 

Good luck!

  • Related