I am trying to replace the latin/other non-english characters into english letters for large number of records; something as below using Pyspark;
'ä':'a',
'ö':'o',
'ü':'u',
'ẞ':'s',
'á':'a',
and more
In Python; i could find the built in functions in python from unicodedata library unicodedata for a string;
s1 = unicodedata.normalize('NFKD', 'string').encode('ASCII', 'ignore')
and for Pandas dataframe using pyspark.pandas.Series.str.normalize
In case of above; it requires conversion of the Pyspark DataFrame to Pandas -> performing the normalization of strings -> and Converstion back into PySpark dataframe; all of these steps which consume memory and time.
I am willing to know if there exists a way to perform these replacements of the latin/other characters in Pyspark.
The following are something i have tried;
a. Manually mapping all the characters to english letters using .replace
pySpark_df = pySpark_df .replace(to_replace=normalize_mapping, subset=['col_name'])
However the mapping is incomplete as the total such mappings/variations outweighs than what i could manually collect.
b. Use of Regular expressions and replacement using regex_replace regex_replace
Using Regex, i could not find any patterns for me to make these replacements with english letter. I can see this this thread being helpful What is the best way to remove accents with Apache Spark dataframes in PySpark?
However the mentioned dependencies are not supported in the platform i work on (Foundry by Palantir). i.e bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar \ --driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
Will appreciate any insights especially if these replacements can be achieved via regex in PySpark.
Edit: Currently we have UDF which does the above task. However having this task performed by Pyspark in a distributed way is something we would like to check for.
CodePudding user response:
I would simply use an UDF but it does not work for every char (ẞ for example):
import unicodedata
from pyspark.sql import functions as F
a = [
("ä", "a"),
("ö", "o"),
("ü", "u"),
("ẞ", "s"),
("á", "a"),
]
b = ["to_change", "ref"]
df = spark.createDataFrame(a, b)
normalize_udf = F.udf(
lambda x: unicodedata.normalize("NFKD", x).encode("ASCII", "ignore").decode("ASCII")
)
df.withColumn("new", normalize_udf(F.col("to_change"))).show()
--------- --- ---
|to_change|ref|new|
--------- --- ---
| ä| a| a|
| ö| o| o|
| ü| u| u|
| ẞ| s| |
| á| a| a|
--------- --- ---
CodePudding user response:
Using an F.pandas_udf() will provide much better performance than a regular F.udf()
since it uses apache Arrow to send data into the python process in batches, and operates on it in a vectorized manner using pandas.
Something along the lines of:
from pyspark.sql import functions as F, types as T
import pandas as pd
@F.pandas_udf(T.StringType())
def remove_accents(s: pd.Series) -> pd.Series:
# I'm not a pandas expert, maybe there's a better native way to do the encode step than with a lambda like this.
return s.str.normalize("NFKD").map(lambda s: s.encode("ASCII", "ignore"))
df = df.withColumn("cleaned_column", remove_accents("some_column"))