df = spark.createDataFrame([("A", "X-X-------------------------------X--X---XX-X--X-------")],["id", "value"])
From above dataframe, find all occurrences of X
for column value
Expected output:
id | value |
---|---|
A | [1, 3, 35, 38, 42, 43, 45, 48] |
CodePudding user response:
You may use a custom udf to achieve this eg
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.udf(T.ArrayType(T.IntegerType()))
def udf_val_indexes(str_val):
indexes = []
for index,val in enumerate(str_val):
if val=="X":
indexes.append(index 1)
return indexes
df.withColumn("value",udf_val_indexes(F.col("value"))).show(truncate=False)
--- ------------------------------
|id |value |
--- ------------------------------
|A |[1, 3, 35, 38, 42, 43, 45, 48]|
--- ------------------------------
or split by your character and with the assistance of posexplode, sum to find the indexes before grouping them to one row as shown below.
NB. The order by clauses help to maintain the order of the indexes.
from pyspark.sql import functions as F
from pyspark.sql import Window
(
df.withColumn("val_split",F.split("value","X"))
.select(
F.col("id"),
F.posexplode("val_split")
)
.withColumn("row_pos_to_exclude",F.max("pos").over(Window.partitionBy("id")))
.filter(F.col("pos") != F.col("row_pos_to_exclude") )
.withColumn("val_split_len",F.length("col") 1)
.withColumn(
"val_split_len",
F.sum("val_split_len").over(
Window.partitionBy("id")
.orderBy("pos")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
)
.withColumn(
"value",
F.collect_list("val_split_len").over(
Window.partitionBy("id")
.orderBy("pos")
)
)
.groupBy("id")
.agg(
F.max("value").alias("value")
)
).show(truncate=False)
--- ------------------------------
|id |value |
--- ------------------------------
|A |[1, 3, 35, 38, 42, 43, 45, 48]|
--- ------------------------------
Let me know if this works for you.