I'm converting Scala code to Python. Scala code is using UDF.
def getVectors(searchTermsToProcessWithTokens: Dataset[Person]): Dataset[Person] = {
import searchTermsToProcessWithTokens.sparkSession.implicits._
def addVectors(
tokensToSearchFor: String,
tokensToSearchIn: String
): Seq[Int] = {
tokensToSearchFor.map(token => if (tokensToSearchIn.contains(token)) 1 else 0)
}
val addVectorsUdf: UserDefinedFunction = udf(addVectors _)
searchTermsToProcessWithTokens
.withColumn("search_term_vector", addVectorsUdf($"name", $"age"))
.withColumn("keyword_text_vector", addVectorsUdf($"name", $"age"))
.as[Person]
}
My Python conversion:
def getVectors(searchTermsToProcessWithTokens):
def addVectors(tokensToSearchFor: str, tokensToSearchIn: str):
tokensToSearchFor = [1 if (token in tokensToSearchIn) else 0 for token in tokensToSearchIn]
return tokensToSearchFor
addVectorsUdf= udf(addVectors, ArrayType(StringType()))
TokenizedSearchTerm = searchTermsToProcessWithTokens \
.withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
.withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))
return TokenizedSearchTerm
Defining simple Dataset in Scala like
case class Person(name: String, age: Int)
val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()
// ------ ---
// | name|age|
// ------ ---
// | Max| 33|
// | Adam| 32|
// |Muller| 62|
// ------ ---
I'm getting output from Scala function
val x= getVectors(personDS)
x.show()
// ------ --- ------------------ -------------------
// | name|age|search_term_vector|keyword_text_vector|
// ------ --- ------------------ -------------------
// | Max| 33| [0, 0, 0]| [0, 0, 0]|
// | Adam| 32| [0, 0, 0, 0]| [0, 0, 0, 0]|
// |Muller| 62|[0, 0, 0, 0, 0, 0]| [0, 0, 0, 0, 0, 0]|
// ------ --- ------------------ -------------------
But for the same defined PySpark DataFrame
%python
personDF = spark.createDataFrame([["Max", 32], ["Adam", 33], ["Muller", 62]], ['name', 'age'])
------ ---
| name|age|
------ ---
| Max| 32|
| Adam| 33|
|Muller| 62|
------ ---
I'm getting from Python version
An exception was thrown from a UDF: 'TypeError: 'int' object is not iterable'
What it is wrong with this conversion?
CodePudding user response:
It's because your tokensToSearchIn
is integer, while it should be string. The following works:
def getVectors(searchTermsToProcessWithTokens):
def addVectors(tokensToSearchFor: str, tokensToSearchIn: str):
tokensToSearchFor = [1 if token in str(tokensToSearchIn) else 0 for token in tokensToSearchFor]
return tokensToSearchFor
addVectorsUdf = udf(addVectors, ArrayType(StringType()))
TokenizedSearchTerm = searchTermsToProcessWithTokens \
.withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
.withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))
return TokenizedSearchTerm
For the sake of curiosity, you don't need a UDF. But it doesn't look simpler...
def getVectors(searchTermsToProcessWithTokens):
def addVectors(tokensToSearchFor: str, tokensToSearchIn: str):
def arr(s: str):
split = f"split({s}, '')"
return expr(f'slice({split}, 1, size({split})-1)')
return transform(arr(tokensToSearchFor), lambda token: when(array_contains(arr(tokensToSearchIn), token), 1).otherwise(0))
TokenizedSearchTerm = searchTermsToProcessWithTokens \
.withColumn("search_term_vector", addVectors("name", "age")) \
.withColumn("keyword_text_vector", addVectors("name", "age"))
return TokenizedSearchTerm