I have the following prelude code that is shared between my two scenarios:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [22.0, 88.0, np.nan]})
Now, I would like to convert df
into a pyspark dataframe (sdf
). When I try to "cast" "col2"
implicitly into LongType
via a schema during the creation of sdf
it fails:
schema = StructType([StructField("col1", LongType()), StructField("col2", LongType())])
sdf = spark.createDataFrame(df[schema.fieldNames()], schema=schema)
Error:
TypeError: field col2: LongType can not accept object 22.0 in type <class 'float'>
But If I do the following it works just fine:
schema_2 = StructType(
[StructField("col1", LongType()), StructField("col2", FloatType())]
)
sdf = spark.createDataFrame(df[schema.fieldNames()], schema=schema_2)
cast_sdf = sdf.withColumn("col2", F.col("col2").cast(LongType()))
cast_sdf.show()
with the output:
---- ----
|col1|col2|
---- ----
| 1| 22|
| 2| 88|
| 3| 0|
---- ----
CodePudding user response:
Transforming my comment into answer.
This is actually how Spark works with schemas. It is not specific to pandas dataframe being converted into pyspark dataframe. You'll get the same error when using createDataframe
method with list of tuples:
import numpy as np
schema = StructType([StructField("col1", LongType()), StructField("col2", LongType())])
df = spark.createDataFrame([(1, 22.0), (2, 88.0), (3, np.nan)], schema)
# TypeError: field col2: LongType can not accept object 22.0 in type <class 'float'>
This is also the behavior with DataSources like CSV when you pass schema (although when reading CSV it does not fail with mode PERMISSIVE
but values are loaded as null). Because the schema does not make automatic casting of types, it just tells Spark which datatype should be there for each column in rows.
So when using schema, you have to pass data that matches the specified types or use StringType
which does not fail, then use explicit casting to convert your columns into desired types.
schema = StructType([StructField("col1", LongType()), StructField("col2", StringType())])
df = spark.createDataFrame([(1, 22.0), (2, 88.0), (3, np.nan)], schema)
df = df.withColumn("col2", F.col("col2").cast("long"))
df.show()
# ---- ----
#|col1|col2|
# ---- ----
#| 1| 22|
#| 2| 88|
#| 3|null|
# ---- ----
CodePudding user response:
I dont see how your operation would succeed. From my little knowledge, Double
is a high level precision float
. Long
is a huge number Integer
.
Where nans are mixed with integer or numeric floats, I would coerce it to integer from pandas and impose a LongType() on conversion to spark dataframe. Because the two go together.
Otherwise, I would impose float/double
on conversion to spark dataframe like you did and then cast to LongType()
. Why? Because float
is a relative of double
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [22.0, 88.0, np.nan]}).assign(col2=df['col2'].fillna(0).astype(int))
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,LongType,DoubleType
mySchema = StructType([ StructField("col1", StringType(), True)\
,StructField("col2", LongType(), True)])
sdf=spark.createDataFrame(df,schema=mySchema)
sdf.show()
---- ----
|col1|col2|
---- ----
| 1| 22|
| 2| 88|
| 3| 0|
---- ----