Home > Software engineering >  Add column with the first IP address of the subnet
Add column with the first IP address of the subnet

Time:09-06

I have PySpark dataframe with column named "subnet". I want to add a column which is the first IP of that subnet. I've tried many solutions including

def get_first_ip(prefix):
    n = ipaddress.IPv4Network(prefix)
    first, last = n[0], n[-1]
    return first

df.withColumn("first_ip", get_first_ip(F.col("subnet")))

But getting error:

-> 1161             raise AddressValueError("Expected 4 octets in %r" % ip_str)
   1162 
   1163         try:

AddressValueError: Expected 4 octets in "Column<'subnet'>"

I do understand that is the Column value and can no use it as a simple string here, but how to solve my problem with PySpark?

I could do the same in pandas and then convert to PySpark, but I'm wondering if there's any other more elegant way?

CodePudding user response:

It's hard to tell what's the issue when we don't know how the input dataframe looks like. But something is wrong with the column values as @samkart suggested.

Here's an example that I tested:

import ipaddress
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType


def get_first_ip(x):
    n = ipaddress.IPv4Network(x)
    return str(n[0])


def get_last_ip(x):
    n = ipaddress.IPv4Network(x)
    return str(n[-1])


first_ip_udf = F.udf(lambda x: get_first_ip(x), StringType())
last_ip_udf = F.udf(lambda x: get_last_ip(x), StringType())

spark = SparkSession.builder.getOrCreate()
data = [
    {"IP": "10.10.128.123"},
    {"IP": "10.10.128.0/17"},
]
df = spark.createDataFrame(data=data)
df = df.withColumn("first_ip", first_ip_udf(F.col("IP")))
df = df.withColumn("last_ip", last_ip_udf(F.col("IP")))

Outputs:

 -------------- ------------- -------------                                     
|IP            |first_ip     |last_ip      |
 -------------- ------------- ------------- 
|10.10.128.123 |10.10.128.123|10.10.128.123|
|10.10.128.0/17|10.10.128.0  |10.10.255.255|
 -------------- ------------- ------------- 

CodePudding user response:

You cannot directly apply python native function to a Spark dataframe column. As demonstrated in this answer, you could create a udf from your function.

Since udf is slow for big dataframes, you could use pandas_udf which is a lot faster.

Input:

import ipaddress
import pandas as pd
from pyspark.sql import functions as F
df = spark.createDataFrame([("10.10.128.123",), ("10.10.128.0/17",)], ["subnet"])

Script:

@F.pandas_udf('string')
def get_first_ip(prefix: pd.Series) -> pd.Series:
    return prefix.apply(lambda s: str(ipaddress.IPv4Network(s)[0]))

df = df.withColumn("first_ip", get_first_ip("subnet"))

df.show()
#  -------------- ------------- 
# |        subnet|     first_ip|
#  -------------- ------------- 
# | 10.10.128.123|10.10.128.123|
# |10.10.128.0/17|  10.10.128.0|
#  -------------- ------------- 
  • Related