Given the dataframe below:
Row|Address_StoreHours_Phone_RemoveHrTag|
--- ------------------------------------
| 1| Ph 148-01 Metro P...|
| 2| Store Hours: 7:00...|
| 3| <hr />|
| 4| Ground Floor Vict...|
| 5| 632-82833778|
| 6| Store Hours:|
| 7| <hr />|
| 8| Phase 1 Package 1...|
| 9| 632-83722847|
| 10| Store Hours: 7:00...|
| 11| <hr />|
How do i add empty row 1 row above hr / if hr /'s row is not divisible by 4, e.g.
Row|Address_StoreHours_Phone_RemoveHrTag|
--- ------------------------------------
| 1| Ph 148-01 Metro P...|
| 2| |
| 3| Store Hours: 7:00...|
| 4| <hr />|
| 5| Ground Floor Vict...|
| 6| 632-82833778|
| 7| Store Hours:|
| 8| <hr />|
| 9| Phase 1 Package 1...|
| 10| 632-83722847|
| 11| Store Hours: 7:00...|
| 12| <hr />|
Here's what i've done so far, hr_tag column returns true if hr / is in row divisible by 4
spark.sql("""
WITH create_row AS(
SELECT CAST(ROW_NUMBER() OVER (ORDER BY (SELECT 1)) AS INTEGER) AS Row, *
FROM ssd_others
), filter_hr_tag AS (
SELECT
*,
CASE
WHEN `Address_StoreHours_Phone_RemoveHrTag` = '<hr />' AND `Row` % 4 = 0
THEN True
ELSE False
END AS hr_tag
FROM create_row
)
SELECT *
FROM filter_hr_tag
""")
and here's the code's output
Row|Address_StoreHours_Phone_RemoveHrTag|hr_tag|
--- ------------------------------------ ------
| 1| Ph 148-01 Metro P...| false|
| 2| Store Hours: 7:00...| false|
| 3| <hr />| false|
| 4| Ground Floor Vict...| false|
| 5| 632-82833778| false|
| 6| Store Hours:| false|
| 7| <hr />| false|
| 8| Phase 1 Package 1...| false|
| 9| 632-83722847| false|
| 10| Store Hours: 7:00...| false|
| 11| <hr />| false|
CodePudding user response:
I can't do it with spark, so my workaround here is just to use python's list
ssd_row = ssd_cleaning.select("Row").rdd.flatMap(lambda x: x).collect()
ssd_other_info = ssd_cleaning.select("Address_StoreHours_Phone_RemoveHrTag").rdd.flatMap(lambda x: x).collect()
ssd_list = [list(x) for x in zip(ssd_row, ssd_other_info)]
for row in ssd_list:
if row[1] == '<hr />' and row[0] % 4 != 0:
new_empty_row = [row[0] - 1.5,""]
ssd_list.append(new_empty_row)
ssd_list.sort(key = lambda x: x[0])
for index,x in enumerate(ssd_list):
ssd_list[index][0] = index 1
reordered_data_distribution_cleaning_ssd = spark.createDataFrame(ssd_list)
CodePudding user response:
This can be done by
- Identify when an empty row has to be added.
- If for every row, the succedding row contains
add_empty_row
, then convertAddress_StoreHours_Phone_RemoveHrTag
to an array of a empty string and thenAddress_StoreHours_Phone_RemoveHrTag
. - Finally explode the column from step 2 and recompute the row number.
Window functions without
partitionBy
move all data into a single partition degrading performance. I assume given your use in the code snippet in question this is acceptable.
data = [(1, 'Ph 148-01 Metro P...'),
(2, 'Store Hours: 7:00...'),
(3, '<hr />',),
(4, 'Ground Floor Vict...'),
(5, ' 632-82833778",',),
(6, 'Store Hours:'),
(7, '<hr />',),
(8, 'Phase 1 Package 1...'),
(9, ' 632-83722847",',),
(10, 'Store Hours: 7:00...'),
(11, '<hr />',), ]
df = spark.createDataFrame(data, ("Row", "Address_StoreHours_Phone_RemoveHrTag", ))
df.createOrReplaceTempView("create_row")
query = """
SELECT ROW_NUMBER() OVER(ORDER BY `row`, length(`address_storehours_phone_removehrtag`)) as `row`,
EXPLODE(CASE
WHEN add_empty_row THEN array("", `address_storehours_phone_removehrtag`)
ELSE array(`address_storehours_phone_removehrtag`)
END) AS `address_storehours_phone_removehrtag`
FROM
(SELECT `row`,
`address_storehours_phone_removehrtag`,
LEAD(add_empty_row, 1, FALSE) OVER (
ORDER BY `row`) AS add_empty_row
FROM
(SELECT *,
CASE
WHEN `address_storehours_phone_removehrtag` = '<hr />'
AND `row` % 4 != 0 AND `row` % 4 = `row` THEN TRUE
ELSE FALSE
END AS add_empty_row
FROM create_row) t) w
"""
spark.sql(query).show()
"""
--- ------------------------------------
|row|address_storehours_phone_removehrtag|
--- ------------------------------------
| 1| Ph 148-01 Metro P...|
| 2| |
| 3| Store Hours: 7:00...|
| 4| <hr />|
| 5| Ground Floor Vict...|
| 6| 632-82833778",|
| 7| Store Hours:|
| 8| <hr />|
| 9| Phase 1 Package 1...|
| 10| 632-83722847",|
| 11| Store Hours: 7:00...|
| 12| <hr />|
--- ------------------------------------
"""