Home > Enterprise >  Spark Read From Mysql Force Schema
Spark Read From Mysql Force Schema

Time:10-11

I have a spark job that reads a table from mysql but for some reason spark is defining int column as boolean. How could I force the data type during the table read?

Spark Session:

spark = (SparkSession.builder
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true") 
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.22")
.enableHiveSupport().getOrCreate()
)

Spark Reading:

spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load() \
    .registerTempTable(table_name) 

CodePudding user response:

There are a few approaches to tackle this problem.

Supply customSchema option

For the jdbc data source, there are several read options which can be specified, one being customSchema which enables you to supply a column name (as it appears in the table) and the type to convert it to. In the example case:

spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("customSchema", "COLUMN_NAME INT") \
    .load() \
    .registerTempTable(table_name) 

Casting (Query)

In the query option, you should be able to specify the column you want to cast and perform the casting there, but it will require juggling some column names.

import pyspark.sql.functions as F
df = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT t.*, CAST(COLUMN_NAME AS int) AS COLUMN_NAME_INT FROM {table} t") \    
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()
df = df.drop(F.col(COLUMN_NAME))
df = df.withColumnRenamed(COLUMN_NAME_INT, COLUMN_NAME)
df.registerTempTable(table)

Casting (Dataframe)

Alternatively, casting could occur after reading the data into the dataframe:

from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

df = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load() 
df = df.withColumn(COLUMN_NAME, F.col(COLUMN_NAME).cast(IntegerType()))
df.registerTempTable(table)
  • Related