I'm trying the following code:
import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("udf").getOrCreate()
def gender(s):
m = MorphAnalyzer()
return m.parse(s)[0].tag.gender
gen = F.udf(gender, T.StringType())
df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила"]}))
df.select(gen("name").alias("gender")).show()
and more or less expectedly getting the following error message:
ERROR Executor: Exception in task 2.0 in stage 29.0 (TID 151)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.cloudpickle.cloudpickle._make_skeleton_class). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199)
What could be an easiest way to circumvent the error (if any) ?
CodePudding user response:
You could use pandas_udf
which is vectorized (more efficient compared to a regular udf
).
import pandas as pd
from pymorphy2 import MorphAnalyzer
from pyspark.sql import SparkSession, types as T, functions as F
spark = SparkSession.builder.appName("udf").getOrCreate()
@F.pandas_udf(T.StringType())
def gender(s: pd.Series) -> pd.Series:
return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
df = spark.createDataFrame(pd.DataFrame({"name": ["кирилл", "вавила", "софия"]}))
df.withColumn("gender", gender("name")).show()
# ------ ------
# | name|gender|
# ------ ------
# |кирилл| masc|
# |вавила| masc|
# | софия| femn|
# ------ ------
In some systems, you may not need pyspark.sql.types
library solely for pandas_udf
, the function could be defined like this:
@F.pandas_udf('string')
def gender(s: pd.Series) -> pd.Series:
return s.apply(lambda x: MorphAnalyzer().parse(x)[0].tag.gender)
CodePudding user response:
The error means Spark can't pickle(serialize) the object you passed. The class of that object is a custom type(TypedGrammeme
) defined by Pymorphy2.
So, convert to a str
like the following.
def gender(s):
...
return str(m.parse(s)[0].tag.gender)
Also, you should avoid to do a resource intensive task in your UDF, such as creating a MorphAnalyzer
. Create a single instance and reuse it using a global variable.