Home > Enterprise >  Remove rows with invalid polygon values in a PySpark data frame?
Remove rows with invalid polygon values in a PySpark data frame?

Time:10-28

We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.

Schema of data frame looks like:

root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)

I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:

station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")

Example input data looks like this:

-RECORD 0-------------------------------------
geo_name              | Neckarkanal         
 geo_latitude          | 49.486697           
 geo_longitude         | 8.504944            
 geo_bst               | 0                   
 geo_bvw               | 0                   
 geometry_type         | Polygon             
 geometry_polygon      | 8.4937, 49.4892, ...
 geometry_multipolygon | null                
 polygon               | POLYGON ((8.4937 ...  

The error occurs with just calling:

df.show()

The error:

java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring

enter image description here

To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:

dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()

Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?

CodePudding user response:

Since you had a dataframe, pandas_udf check should work very well. Sadly, the function itself may not look very nice, but it seems to work. In the below example, it can be seen that "geo_name" = X is invalid for a polygon, and in the output, the polygon for this row is not created.

Input:

df = spark_sedona.createDataFrame(
    [('A', '-74, 40, -73, 39, -75, 38, -74, 40'),
     ('X', '-11'),
     ('Y', None),
     ('B', '-33, 50, -30, 38, -40, 27, -33, 50')],
    ['geo_name', 'geometry_polygon']
)

Script:

from pyspark.sql import functions as F
import pandas as pd
from shapely.geometry import Polygon

@F.pandas_udf('string')
def nullify_invalid_polygon(s: pd.Series) -> pd.Series:
    def is_digit(lst):
        return [e.strip().lstrip('-').replace('.', '', 1).isdigit() if e else None for e in lst]
    coords = s.str.split(',')
    even_cnt = coords.map(lambda x: x if x and len(x) % 2 == 0 else None)
    point_cnt_ge3 = even_cnt.map(lambda x: x if x and len(x) >= 6 else None)
    all_digits = point_cnt_ge3.map(lambda x: x if x and all(is_digit(x)) else None)
    polygon_shell = all_digits.map(lambda x: list(zip(*[iter(map(float, x))]*2)) if x else None)
    is_valid = polygon_shell.map(lambda x: Polygon(x).is_valid if x else False)
    return s.where(is_valid, None)

df = df.withColumn('geometry_polygon', nullify_invalid_polygon('geometry_polygon'))

df.createOrReplaceTempView("station_gdf")
df = spark_sedona.sql("SELECT *, CASE WHEN isnull(geometry_polygon) THEN null ELSE ST_PolygonFromText(geometry_polygon, ',') END AS polygon FROM station_gdf")

Result:

df.printSchema() 
# root
#  |-- geo_name: string (nullable = true)
#  |-- geometry_polygon: string (nullable = true)
#  |-- polygon: geometry (nullable = true)

df.show(truncate=0)
#  -------- ---------------------------------- ------------------------------------------ 
# |geo_name|geometry_polygon                  |polygon                                   |
#  -------- ---------------------------------- ------------------------------------------ 
# |A       |-74, 40, -73, 39, -75, 38, -74, 40|POLYGON ((-74 40, -73 39, -75 38, -74 40))|
# |X       |null                              |null                                      |
# |Y       |null                              |null                                      |
# |B       |-33, 50, -30, 38, -40, 27, -33, 50|POLYGON ((-33 50, -30 38, -40 27, -33 50))|
#  -------- ---------------------------------- ------------------------------------------ 
  • Related