Home > Net >  How to filter rows in a pyspark dataframe with values from another?
How to filter rows in a pyspark dataframe with values from another?

Time:08-09

The goal is to filter the first dataframe based on how similar the x and y are to different zones in the second dataframe. There is a calculation set to each x and y in df 1 that creates boundaries - a delta value (ie x_minus = x - 2 or x_plus = x 2). The function then filters df2 based on if the x is less than x_plus and greater than x_minus and the same for y's. The issue I'm running into is how to access 2 dataframes in a udf or how to read a dataframe inside another

The actual datasets have grown to be 100's of gb's large so python alone isn's sufficient, but initially the solution was found in python on the smaller versions of the data and now it must be converted to pyspark. I'm currently running these processes using EMR clusters and a jupyter notebook to test it out. Below is a sample of fake data to demonstrate process.


id ; x ; y 
1  ;19;11
2  ;29;9
3  ;39;3
4  ;49;6


df2
id ; x ; y ; zone
1  ;20; 12 ; GG
2  ;30; 9 ; AA
3  ;40; 3 ; BB
4  ;50; 6 ; TT

df3 - desired results
id ; x ; y ; zone
1  ;19; 11 ; GG
2  ;29; 9 ; AA
3  ;39; 3 ; BB
4  ;49; 6 ; TT

# x is gathered on a row by row basis like .apply for pandas dataframes
def find_zone(x,y,lookup):
    x_plus = x   2 
    x_minus = x - 2 
    y_plus = y   2 
    y_minus = y - 2 
    zone = lookup.loc[(lookup['x'] < x_plus & lookup['x'] > x_minus & lookup['y'] < y_plus & lookup['y'] > y_minus)]['zone']
    return zone

#zone for each row converted to udf for pyspark
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

find_zone = udf(find_zone, IntegerType)

# first way tried with df2 as a parameter
df1 = df1.withColumn('zone',find_zone('x','y',df2)

# second way tried to create df2 within the udf
def find_zone(x,y):
    x_plus = x   2 
    x_minus = x - 2 
    y_plus = y   2 
    y_minus = y - 2 

# importing as spark df and converting to pandas because it was the only method that partially worked while testing
    lookup = spark.read.parquet('s3_uri').toPandas()
    zone = lookup.loc[(lookup['x'] < x_plus & lookup['x'] > x_minus & lookup['y'] < y_plus & lookup['y'] > y_minus)]['zone']
    return zone

df1 = df1.withColumn('zone',find_zone('x','y')

My apologies for any syntax issues I'm trying to copy something similar without revealing my actual functions or values.

CodePudding user response:

Where possible stop using pandas, if you want this to work on 100's of Gigabytes.

I wrote this code from memory, it should be considered pseudo code

You also can't us reference spark context inside a udf. (which means you can't use a spark dataframe inside a udf). If you think about why this wouldn't work, you'd need to ship the entire dataset for the dataframe to each executor and that wouldn't fit in memory.

If you are only doing this once, you can just do a join and it would likely be efficient.

from pyspark.sql.functions import lit

df.join( df2, \
 ( df.x.between( df2.x - lit(2), df2.x   lit(2) ) )\
 &\
 ( df.y.between( df2.y - lit(2), df2.y   lit(2) ) ) )\
 .select( df.id , df.x , df.y , df2.zone)

WARNING You could get multiple results of zones using this code,please make sure you have logic to handle this.

If you are going to use this code a lot, and zones table is small consider reworking the zones to capture more data. sequence, explode

from pyspark.sql.functions import explode, sequence, lit

new_df2 = df2.withColumn( 'x', #clobber y column with new data
 explode(sequence( df2.x - lit(2) , df2.x   lit(2) ))\# create multiple rows with all possible x values
.withColumn( 'y', #clobber y column with new data
 explode( sequence( df2.y - lit(2) , df2.y   lit(2) ) ) )# create multiple rows with all possible y values

df.join( new_df2, [ x, y ] ) # now you can do a simple join and this is more efficient.
  • Related