I have the following issue since I am a bit of a noob in pyspark. Based on conditions on values coming from two columns I want to populate a third column. The conditions are:
- if semicolon contained in col2, check col1
- if col1 == 1, take the value before the semicolon
- if col1 == 2, take the value after the semicolon
- if no semicolon in col1, take the value from col2 as-is
This is what the dataframe looks like.
col1 | col2 | col3 |
---|---|---|
1 | 24.9;34.9 | 24.9 |
2 | 24.9;34.9 | 34.9 |
1 | 80.8;90.9 | 80.8 |
2 | 80.8;90.9 | 90.9 |
1 | 777 | 777 |
I made the following udf which gives the error Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
import pyspark.sql.functions as F
def split_by_semicolon_if_exists(col1,col2):
if (col1.contains(';') == True):
if col2 == 1:
result = F.substring(col1, 0, (F.length(col1) - F.expr('locate('';'', col1) - 1')))
if col2 == 2:
result = F.substring(col1, F.expr('locate('';'', col1) - 1'), (F.length(col1) - F.expr('locate('';'', col1) - 1')))
return result
else:
return col1
df = df.withColumn('col3',
split_by_semicolon_if_exists(df['col1'],
df['col2']))
I have built this udf by googling for the various functions so there probably are multiple issues with it. Can you please help me build a udf for this case?
CodePudding user response:
Take a look at split
function.
Using UDF:
spark = SparkSession.builder.getOrCreate()
data = [
{"col1": 1, "col2": "24.9;34.9"},
{"col1": 2, "col2": "24.9;34.9"},
{"col1": 1, "col2": "80.8;90.9"},
{"col1": 1, "col2": "777"},
]
df = spark.createDataFrame(data)
def get_value(item, value):
if ";" in value:
return value.split(";")[item - 1]
return value
df = df.withColumn("col3", F.udf(get_value, StringType())(F.col("col1"), F.col("col2")))
Without UDF:
df = df.withColumn(
"col3",
F.when(
F.col("col2").contains(";"), F.split("col2", ";").getItem(F.col("col1") - 1)
).otherwise(F.col("col2")),
)
Result:
root
|-- col1: long (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
---- --------- ----
|col1|col2 |col3|
---- --------- ----
|1 |24.9;34.9|24.9|
|2 |24.9;34.9|34.9|
|1 |80.8;90.9|80.8|
|1 |777 |777 |
---- --------- ----
CodePudding user response:
You can use this code:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df =spark.createDataFrame(
data = [(1, "24.9;34.9"),
(2,"24.9;34.9"),
(1,"80.8;90.9"),
(2,"80.8;90.9"),
(1,"777")],
schema=["col1","col2"])
df.show()
def split_by_semicolon_if_exists(col1,col2):
if ';' in col2 :
if col1 == 1:
result = col2.split(';')[0]
if col1 == 2:
result = col2.split(';')[1]
return result
else:
return col2
split_by_semicolon_if_exists_udf =F.udf(split_by_semicolon_if_exists , T.StringType())
df = df.withColumn('col3', split_by_semicolon_if_exists_udf(df['col1'], df['col2']))
df.show()
To use functions over a column of your dataframe you have to declare them as udf funcions with F.udf(function, return argument type)
You can check this documentation https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
Also, python has easier functions to manage strings such as
if 'string' in stringVariable :
(True/ False if substring is present in main string)
You can also divide strings in a certain character using
string.split(';')
(Returns an array of the sepparated parts)
CodePudding user response:
You can use expr
without having to use a udf here. Since Python indexing starts from 0, you need to subtract 1 from your col1:
from pyspark.sql import functions as F
df.withColumn("Result",F.expr("""split(col2,';')[int(col1)-1]""")).show()
---- --------- ----- ------
|col1| col2| col3|Result|
---- --------- ----- ------
| 1|24.9;34.9| 24.9| 24.9|
| 2|24.9;34.9| 34.9| 34.9|
| 1|80.8;90.9| 80.8| 80.8|
| 2|80.8;90.9| 90.9| 90.9|
| 1| 777|777.0| 777|
---- --------- ----- ------
The new column Result
is same as your output in col3