We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.
Schema of data frame looks like:
root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)
I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:
station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")
Example input data looks like this:
-RECORD 0-------------------------------------
geo_name | Neckarkanal
geo_latitude | 49.486697
geo_longitude | 8.504944
geo_bst | 0
geo_bvw | 0
geometry_type | Polygon
geometry_polygon | 8.4937, 49.4892, ...
geometry_multipolygon | null
polygon | POLYGON ((8.4937 ...
The error occurs with just calling:
df.show()
The error:
java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring
To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:
dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()
Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?
CodePudding user response:
Since you had a dataframe, pandas_udf
check should work very well. Sadly, the function itself may not look very nice, but it seems to work. In the below example, it can be seen that "geo_name" = X is invalid for a polygon, and in the output, the polygon for this row is not created.
Input:
df = spark_sedona.createDataFrame(
[('A', '-74, 40, -73, 39, -75, 38, -74, 40'),
('X', '-11'),
('Y', None),
('B', '-33, 50, -30, 38, -40, 27, -33, 50')],
['geo_name', 'geometry_polygon']
)
Script:
from pyspark.sql import functions as F
import pandas as pd
from shapely.geometry import Polygon
@F.pandas_udf('string')
def nullify_invalid_polygon(s: pd.Series) -> pd.Series:
def is_digit(lst):
return [e.strip().lstrip('-').replace('.', '', 1).isdigit() if e else None for e in lst]
coords = s.str.split(',')
even_cnt = coords.map(lambda x: x if x and len(x) % 2 == 0 else None)
point_cnt_ge3 = even_cnt.map(lambda x: x if x and len(x) >= 6 else None)
all_digits = point_cnt_ge3.map(lambda x: x if x and all(is_digit(x)) else None)
polygon_shell = all_digits.map(lambda x: list(zip(*[iter(map(float, x))]*2)) if x else None)
is_valid = polygon_shell.map(lambda x: Polygon(x).is_valid if x else False)
return s.where(is_valid, None)
df = df.withColumn('geometry_polygon', nullify_invalid_polygon('geometry_polygon'))
df.createOrReplaceTempView("station_gdf")
df = spark_sedona.sql("SELECT *, CASE WHEN isnull(geometry_polygon) THEN null ELSE ST_PolygonFromText(geometry_polygon, ',') END AS polygon FROM station_gdf")
Result:
df.printSchema()
# root
# |-- geo_name: string (nullable = true)
# |-- geometry_polygon: string (nullable = true)
# |-- polygon: geometry (nullable = true)
df.show(truncate=0)
# -------- ---------------------------------- ------------------------------------------
# |geo_name|geometry_polygon |polygon |
# -------- ---------------------------------- ------------------------------------------
# |A |-74, 40, -73, 39, -75, 38, -74, 40|POLYGON ((-74 40, -73 39, -75 38, -74 40))|
# |X |null |null |
# |Y |null |null |
# |B |-33, 50, -30, 38, -40, 27, -33, 50|POLYGON ((-33 50, -30 38, -40 27, -33 50))|
# -------- ---------------------------------- ------------------------------------------