Home > OS >  Equivalent method for replacing the latin ascents to English Letters using Pyspark
Equivalent method for replacing the latin ascents to English Letters using Pyspark

Time:06-20

I am trying to replace the latin/other non-english characters into english letters for large number of records; something as below using Pyspark;

'ä':'a',
'ö':'o',
'ü':'u',
'ẞ':'s',
'á':'a',
 and more

In Python; i could find the built in functions in python from unicodedata library unicodedata for a string;

s1 = unicodedata.normalize('NFKD', 'string').encode('ASCII', 'ignore')

and for Pandas dataframe using pyspark.pandas.Series.str.normalize

In case of above; it requires conversion of the Pyspark DataFrame to Pandas -> performing the normalization of strings -> and Converstion back into PySpark dataframe; all of these steps which consume memory and time.

I am willing to know if there exists a way to perform these replacements of the latin/other characters in Pyspark.

The following are something i have tried;

a. Manually mapping all the characters to english letters using .replace

pySpark_df = pySpark_df .replace(to_replace=normalize_mapping, subset=['col_name'])

However the mapping is incomplete as the total such mappings/variations outweighs than what i could manually collect.

b. Use of Regular expressions and replacement using regex_replace regex_replace

Using Regex, i could not find any patterns for me to make these replacements with english letter. I can see this this thread being helpful What is the best way to remove accents with Apache Spark dataframes in PySpark?

However the mentioned dependencies are not supported in the platform i work on (Foundry by Palantir). i.e bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar \ --driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar

Will appreciate any insights especially if these replacements can be achieved via regex in PySpark.

Edit: Currently we have UDF which does the above task. However having this task performed by Pyspark in a distributed way is something we would like to check for.

CodePudding user response:

I would simply use an UDF but it does not work for every char (ẞ for example):

import unicodedata

from pyspark.sql import functions as F

a = [
    ("ä", "a"),
    ("ö", "o"),
    ("ü", "u"),
    ("ẞ", "s"),
    ("á", "a"),
]

b = ["to_change", "ref"]


df = spark.createDataFrame(a, b)


normalize_udf = F.udf(
    lambda x: unicodedata.normalize("NFKD", x).encode("ASCII", "ignore").decode("ASCII")
)


df.withColumn("new", normalize_udf(F.col("to_change"))).show()
 --------- --- ---                                                              
|to_change|ref|new|
 --------- --- --- 
|        ä|  a|  a|
|        ö|  o|  o|
|        ü|  u|  u|
|        ẞ|  s|   |
|        á|  a|  a|
 --------- --- --- 

CodePudding user response:

Using an F.pandas_udf() will provide much better performance than a regular F.udf() since it uses apache Arrow to send data into the python process in batches, and operates on it in a vectorized manner using pandas.

Something along the lines of:

from pyspark.sql import functions as F, types as T
import pandas as pd

@F.pandas_udf(T.StringType())
def remove_accents(s: pd.Series) -> pd.Series:
    # I'm not a pandas expert, maybe there's a better native way to do the encode step than with a lambda like this.
    return s.str.normalize("NFKD").map(lambda s: s.encode("ASCII", "ignore")) 

df = df.withColumn("cleaned_column", remove_accents("some_column"))
  • Related