Home > Software design >  pyspark when condition is true, insert some words with column variable
pyspark when condition is true, insert some words with column variable

Time:11-07

I have a csv as described:

s_table | s_name  | t_cast | t_d  |
aaaaaa  |  juuoo  |  TRUE  |float |
aaaaaa  |  juueo  |  TRUE  |float |
aaaaaa  |  ju4oo  |        |      |
aaaaaa  |  juuoo  |        |      |
aaaaaa  |  thyoo  |        |      |
aaaaaa  |  juioo  |        |      |
aaaaaa  |  rtyoo  |        |      |

I am trying to use pyspark when condition to check the condition of t_cast with s_table and if it is TRUE, return a statement in a new column.

What i've tried is:

filters = filters.withColumn("p3", f.when((f.col("s_table") == "aaaaaa") & (f.col("t_cast").isNull()),f.col("s_name")).
                                     when((f.col("s_table") == "aaaaaa") & (f.col("t_cast") == True),
                                          f"CAST({f.col('s_table')} AS {f.col('t_d')}) AS {f.col('s_table')}"))

What I am trying to achieve is for the column p3 to return this:

s_table | s_name  | t_cast | t_d  |   p_3                                  |
aaaaaa  |  juuoo  |  TRUE  |float | cast ('juuoo' as float) as 'juuoo'     |
aaaaaa  |  juueo  |  TRUE  |float | cast ('juueo' as float) as 'juuoo'     |
aaaaaa  |  ju4oo  |        |      |                             ju4oo      |
aaaaaa  |  juuoo  |        |      |                             juuoo      |
aaaaaa  |  thyoo  |        |      |                             thyoo      |
aaaaaa  |  juioo  |        |      |                             juioo      |
aaaaaa  |  rtyoo  |        |      |                             rtyoo      |

But the result that I get is:

CAST(Column<'s_field'> AS Column<'t_data_type'>) AS Column<'s_field'>, 
CAST(Column<'s_field'> AS Column<'t_data_type'>) AS Column<'s_field'>,

I feel like I am almost there, but I can't quite figure it out.

CodePudding user response:

You need to use Spark concat function instead of Python string format to get the expected string. Something like:

import pyspark.sql.functions as F

filters = filters.withColumn(
    "p3",
    (F.when((F.col("s_table") == "aaaaaa") & (F.col("t_cast").isNull()), F.col("s_name"))
     .when((F.col("s_table") == "aaaaaa") & F.col("t_cast"),
           F.expr(r"concat('CAST(\'', s_name, '\' AS ', t_d, ') AS \'', s_table, '\'')")
           )
     )
)

filters.show(truncate=False)

# ------- ------ ------ ----- ---------------------------------- 
#|s_table|s_name|t_cast|t_d  |p3                                |
# ------- ------ ------ ----- ---------------------------------- 
#|aaaaaa |juuoo |true  |float|CAST('juuoo' AS float) AS 'aaaaaa'|
#|aaaaaa |juueo |true  |float|CAST('juueo' AS float) AS 'aaaaaa'|
#|aaaaaa |ju4oo |null  |null |ju4oo                             |
#|aaaaaa |juuoo |null  |null |juuoo                             |
#|aaaaaa |thyoo |null  |null |thyoo                             |
#|aaaaaa |juioo |null  |null |juioo                             |
#|aaaaaa |rtyoo |null  |null |rtyoo                             |
# ------- ------ ------ ----- ---------------------------------- 
  • Related