I have specific problem, where I want to retrieve the value of bu_id
field from id
and matched_ id
.
When there is some value in matched_id
column, bu_id
should be same as the id
for that particular id
and id
s of corresponding matched_id
.
When matched_id
is blank, bu_id
should be same as id
.
input
--- ------------
|id |matched_id |
--- ------------
|0 |7,8 |
|1 | |
|2 |4 |
|3 |5,9 |
|4 |2 |
|5 |3,9 |
|6 | |
|7 |0,8 |
|8 |0,7 |
|9 |3,5 |
output
--- ------------ -----
|id |matched_id |bu_id|
--- ------------ -----
|0 |7,8 |0 |
|1 | |1 |
|2 |4 |2 |
|3 |5,9 |3 |
|4 |2 |2 |
|5 |3,9 |3 |
|6 | |6 |
|7 |0,8 |0 |
|8 |0,7 |0 |
|9 |3,5 |3 |
Can anyone help me how to approach this problem. Thanks in advance.
CodePudding user response:
To answer this question I assumed that the logic you are looking to implement is,
- If the
matched_id
column is null, thenbu_id
should be the same asid
. - If the
matched_id
column is not null, we should consider the values listed in both theid
andmatched_id
columns andbu_id
should be the minimum of those values.
The Set-Up
# imports to include
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
# making your dataframe
df = spark.createDataFrame(
[
('0','7,8'),
('1',''),
('2','4'),
('3','5,9'),
('4','2'),
('5','3,9'),
('6',''),
('7','0,8'),
('8','0,7'),
('9','3,5'),
],
['id', 'matched_id'])
print(df.schema.fields)
df.show(truncate=False)
In this df, both the id
and matched_id
columns are StringType data types. The code that follows builds-off this assumption. You can check the column types in your df by running print(df.schema.fields)
id | matched_id |
---|---|
0 | 7,8 |
1 | |
2 | 4 |
3 | 5,9 |
4 | 2 |
5 | 3,9 |
6 | |
7 | 0,8 |
8 | 0,7 |
9 | 3,5 |
The Logic
To implement the logic for bu_id, we created a function called bu_calculation
that defines the logic. Then we wrap the function in pyspark sql UDF. The bu_id
column is then created by inputing the columns we need to evaluate (the id
and matched_id
columns) into the UDF
# create custom function with the logic for bu_id
def bu_calculation(id_col, matched_id_col):
id_int = int(id_col)
# turn the string in the matched_id column into a list and remove empty values from the list
matched_id_list = list(filter(None, matched_id_col.split(",")))
if len(matched_id_list) > 0:
# if matched_id column has values, convert strings to ints
all_ids = [int(x) for x in matched_id_list]
# join id column values with matched_id column values
all_ids.append(id_int)
# return minimum value
return min(all_ids)
else:
# if matched_id column is empty return the id column value
return id_int
# apply custom bu_calculation function to pyspark sql udf
# the use of IntegerType() here enforces that the bu_calculation function has to return an int
bu_udf = F.udf(bu_calculation, IntegerType())
# make a new column called bu_id using the pyspark sql udf we created called bu_udf
df = df.withColumn('bu_id', bu_udf('id', 'matched_id'))
df.show(truncate=False)
id | matched_id | bu_id |
---|---|---|
0 | 7,8 | 0 |
1 | 1 | |
2 | 4 | 2 |
3 | 5,9 | 3 |
4 | 2 | 2 |
5 | 3,9 | 3 |
6 | 6 | |
7 | 0,8 | 0 |
8 | 0,7 | 0 |
9 | 3,5 | 3 |
More about the pyspark sql udf function here: https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html
CodePudding user response:
I recreated your input pyspark DataFrame assuming that all of your columns are of StringType
:
df = spark.createDataFrame(
[
("0", "7,8"),
("1", ""),
("2", "4"),
("3", "5,9"),
("4", "2"),
("5", "3,9"),
("6", ""),
("7", "0,8"),
("8", "0,7"),
("9", "3,5"),
],
["id", "matched_id"]
)
This looks like the following:
--- ----------
| id|matched_id|
--- ----------
| 0| 7,8|
| 1| |
| 2| 4|
| 3| 5,9|
| 4| 2|
| 5| 3,9|
| 6| |
| 7| 0,8|
| 8| 0,7|
| 9| 3,5|
--- ----------
Then we should try to use functions exclusively from the pyspark.sql.functions module because these are optimized for pyspark dataframes (see here), whereas udfs are not and should be avoided when possible.
To achieve the desired output pyspark dataframe, we can concatenate both "id"
and "matched_id"
columns together, convert the string that into a list of strings using split, cast the result as an array of integers, and take the minimum of the array – and we can get away with not having to worry about the blank strings because they get converted into null
, and F.array_min
drops nulls from consideration. This can be done with the following line of code (and while it is a little hard to read, it gets the job done):
import pyspark.sql.functions as F
df.withColumn(
"bu_id",
F.array_min(F.split(F.concat(F.col("id"),F.lit(","),F.col("matched_id")),",").cast("array<int>"))
).show()
Output:
--- ---------- -----
| id|matched_id|bu_id|
--- ---------- -----
| 0| 7,8| 0|
| 1| | 1|
| 2| 4| 2|
| 3| 5,9| 3|
| 4| 2| 2|
| 5| 3,9| 3|
| 6| | 6|
| 7| 0,8| 0|
| 8| 0,7| 0|
| 9| 3,5| 3|
--- ---------- -----