This is a bit strange.
When I create a dataframe and then do some transformations with the function pow, it works. But when I push it to run in a real world scenario, it does not. The datatype of columns in my dummy and the real world scenario is the same
Error
Method pow([class java.lang.Double, class java.lang.Double]) does not exist
This works (with made up data)
from pyspark.sql.types import StructType,StructField, IntegerType, DoubleType
columns = ["CounterpartID","Year","Month","Day","churnprobability", "deadprobability"]
data = [(1234, 2021,5,12, 0.85,0.6),(1224, 2022,6,12, 0.75,0.6),(1345, 2022,5,13, 0.8,0.2),(234, 2021,7,12, 0.9,0.8), (1654, 2021,7,12, 1.40,20.0), (7548, 2021,7,12, -1.40,20.0), (6582, 2021,7,12, -1.40,20.0)]
schema = StructType([ \
StructField("CounterpartID",IntegerType(),False), \
StructField("Year",IntegerType(),False), \
StructField("Month",IntegerType(),False), \
StructField("Day", IntegerType(), False), \
StructField("churnprobability", DoubleType(), False), \
StructField("deadprobability", DoubleType(), False) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
abc=df.withColumn("client_id", f.col("CounterpartID"))\
.withColumn("year", f.col("Year"))\
.withColumn("month", f.col("Month"))\
.withColumn("day", f.col("Day"))\
.withColumn("churn_probability_unit", f.col("churnprobability").cast(IntegerType()))\
.withColumn("churn_probability_nanos", ((f.col("churnprobability") - f.col("churnprobability").cast(IntegerType())) * pow(10,9)).cast(IntegerType()))\
.withColumn("dead_probability_unit", f.col("deadprobability").cast(IntegerType()))\
.withColumn("dead_probability_nanos", (f.col("deadprobability") %1 * pow(10,9)).cast(IntegerType()))\
.select("client_id", "year", "month", "day", "churn_probability_unit", "churn_probability_nanos", "dead_probability_unit","dead_probability_nanos")\
abc.show()
However, instead of df, in the real world scenario (Production job), I have a real dataframe ( of course) and all columns in that has the same datatype as my dummy dataframe above:
e.g. here:
However, when I do the same transformation, it complains of not able to find the pow function with double parameters. Here's the stack trace (below). I have also looked up the docs here, for pow, but it does not talk anything about datatypes. Some S.O posts suggests Double should be ok. where is it going wrong? I can of course change it multiply by the actual number , instead of using pow, but I would still like to understand this better. Any questions, and I can help answer.
----> 7 .withColumn("churn_probability_nanos", ((f.col("churnprobability") % 1.0) * pow(10,9)).cast(IntegerType()))\
8 .withColumn("dead_probability_unit", f.col("deadprobability").cast(IntegerType()))\
9 .withColumn("dead_probability_nanos", (f.col("deadprobability") %1 * pow(10,9)).cast(IntegerType()))\
/databricks/spark/python/pyspark/sql/functions.py in pow(col1, col2)
737 Returns the value of the first argument raised to the power of the second argument.
738 """
--> 739 return _invoke_binary_math_function("pow", col1, col2)
740
741
/databricks/spark/python/pyspark/sql/functions.py in _invoke_binary_math_function(name, col1, col2)
73 and wraps the result with :class:`~pyspark.sql.Column`.
74 """
---> 75 return _invoke_function(
76 name,
77 # For legacy reasons, the arguments here can be implicitly converted into floats,
/databricks/spark/python/pyspark/sql/functions.py in _invoke_function(name, *args)
57 """
58 jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
---> 59 return Column(jf(*args))
60
61
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
108 def deco(*a, **kw):
109 try:
--> 110 return f(*a, **kw)
111 except py4j.protocol.Py4JJavaError as e:
112 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
328 format(target_id, ".", name), value)
329 else:
--> 330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.pow. Trace:
py4j.Py4JException: Method pow([class java.lang.Double, class java.lang.Double]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:341)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:362)
at py4j.Gateway.invoke(Gateway.java:289)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
CodePudding user response:
Functions in (py)Spark are developped in Scala, and therefore, are strongly typed. It means that the signature of the function is part of the function. For example :
def foo(bar:str):
...
# AND
def foo(bar:int):
...
These two functions are the same in python. They are different functions in Scala because the input type is different.
In your case, pyspark pow
with input as double does not exists. But, pow
that accepts columns
as input exists.
It probably works in your first example because you are not using the pyspark pow
but, instead, the built-in python pow
.
I'd advice you to always use F.function_name
. Many python functions and pyspark functions have the same name so Python will overwrite its built-in functions with the imports.
You should simply change your code to :
from pyspark.sql import functions as F
F.pow(F.lit(10), F.lit(9))
NB: lit
creates a column using the input param.