I have a spark job that reads a table from mysql but for some reason spark is defining int column as boolean. How could I force the data type during the table read?
Spark Session:
spark = (SparkSession.builder
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.22")
.enableHiveSupport().getOrCreate()
)
Spark Reading:
spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
.option("query", f"SELECT * FROM {table}") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load() \
.registerTempTable(table_name)
CodePudding user response:
There are a few approaches to tackle this problem.
Supply customSchema
option
For the jdbc
data source, there are several read options which can be specified, one being customSchema
which enables you to supply a column name (as it appears in the table) and the type to convert it to. In the example case:
spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
.option("query", f"SELECT * FROM {table}") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("customSchema", "COLUMN_NAME INT") \
.load() \
.registerTempTable(table_name)
Casting (Query)
In the query option, you should be able to specify the column you want to cast and perform the casting there, but it will require juggling some column names.
import pyspark.sql.functions as F
df = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
.option("query", f"SELECT t.*, CAST(COLUMN_NAME AS int) AS COLUMN_NAME_INT FROM {table} t") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
df = df.drop(F.col(COLUMN_NAME))
df = df.withColumnRenamed(COLUMN_NAME_INT, COLUMN_NAME)
df.registerTempTable(table)
Casting (Dataframe)
Alternatively, casting could occur after reading the data into the dataframe:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
df = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
.option("query", f"SELECT * FROM {table}") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.load()
df = df.withColumn(COLUMN_NAME, F.col(COLUMN_NAME).cast(IntegerType()))
df.registerTempTable(table)