I have PySpark dataframe with column named "subnet". I want to add a column which is the first IP of that subnet. I've tried many solutions including
def get_first_ip(prefix):
n = ipaddress.IPv4Network(prefix)
first, last = n[0], n[-1]
return first
df.withColumn("first_ip", get_first_ip(F.col("subnet")))
But getting error:
-> 1161 raise AddressValueError("Expected 4 octets in %r" % ip_str)
1162
1163 try:
AddressValueError: Expected 4 octets in "Column<'subnet'>"
I do understand that is the Column value and can no use it as a simple string here, but how to solve my problem with PySpark?
I could do the same in pandas and then convert to PySpark, but I'm wondering if there's any other more elegant way?
CodePudding user response:
It's hard to tell what's the issue when we don't know how the input dataframe looks like. But something is wrong with the column values as @samkart suggested.
Here's an example that I tested:
import ipaddress
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
def get_first_ip(x):
n = ipaddress.IPv4Network(x)
return str(n[0])
def get_last_ip(x):
n = ipaddress.IPv4Network(x)
return str(n[-1])
first_ip_udf = F.udf(lambda x: get_first_ip(x), StringType())
last_ip_udf = F.udf(lambda x: get_last_ip(x), StringType())
spark = SparkSession.builder.getOrCreate()
data = [
{"IP": "10.10.128.123"},
{"IP": "10.10.128.0/17"},
]
df = spark.createDataFrame(data=data)
df = df.withColumn("first_ip", first_ip_udf(F.col("IP")))
df = df.withColumn("last_ip", last_ip_udf(F.col("IP")))
Outputs:
-------------- ------------- -------------
|IP |first_ip |last_ip |
-------------- ------------- -------------
|10.10.128.123 |10.10.128.123|10.10.128.123|
|10.10.128.0/17|10.10.128.0 |10.10.255.255|
-------------- ------------- -------------
CodePudding user response:
You cannot directly apply python native function to a Spark dataframe column. As demonstrated in this answer, you could create a udf
from your function.
Since udf
is slow for big dataframes, you could use pandas_udf
which is a lot faster.
Input:
import ipaddress
import pandas as pd
from pyspark.sql import functions as F
df = spark.createDataFrame([("10.10.128.123",), ("10.10.128.0/17",)], ["subnet"])
Script:
@F.pandas_udf('string')
def get_first_ip(prefix: pd.Series) -> pd.Series:
return prefix.apply(lambda s: str(ipaddress.IPv4Network(s)[0]))
df = df.withColumn("first_ip", get_first_ip("subnet"))
df.show()
# -------------- -------------
# | subnet| first_ip|
# -------------- -------------
# | 10.10.128.123|10.10.128.123|
# |10.10.128.0/17| 10.10.128.0|
# -------------- -------------