I have an existing implementation of a procedure that uses the Unix/Linux sort utility, first sorting the whole file, then sorting only unique elements (-u) option.
The organization I'm working with wants this completely in Pyspark, but I don't see how to do this. I tried intersect subtract and select distinct and none of them quite fill the bill. (select distinct was close, but gets only the fields involved in the sort, and I need an ID field as well.)
Anybody got a suggested Pyspark-only solution for this? Thanks much.
CodePudding user response:
Something like this can do the trick?
import os
import shutil
import tempfile
from pyspark.sql import functions as F, SparkSession
def unix_sort(spark, input_filepath, out_filepath):
"""Read a file, unix-sort it and save to another file"""
file_content_df = spark.read.text(input_filepath)
sorted_content_df = (
file_content_df
.dropDuplicates()
.orderBy(F.col('value').asc())
)
temp_dir = tempfile.mkdtemp()
print(temp_dir)
# Write to output file
# 1. save in a temporary folder
sorted_content_df.coalesce(1).write.format("text").option("header", "false").mode("append").save(temp_dir)
# 2. Find the file in the temporary folder
temporary_file = next(filter(
lambda f: os.path.splitext(f)[1] == '.txt',
next(os.walk(temp_dir))[2]
))
# 3. Move the file in the final destination path
shutil.move(os.path.join(temp_dir, temporary_file), out_filepath)
# 4. Remove the temporary folder
shutil.rmtree(temp_dir)
Suppose that the input file input_file.txt
is:
1haao
AAAA
BBB
alpha
Beta
Gamma
delta epsilon
theta
2iota
running the following code:
spark = SparkSession.builder.getOrCreate()
unix_sort(spark, "input_file.txt", "out_file.txt")
a file called out_file.txt
will be created with the following content:
1haao
2iota
AAAA
BBB
Beta
Gamma
alpha
delta epsilon
theta