I am reading a delta table like this
df = (spark
.read
.format("delta")
.load("/path/to/foo/table")
.select("Foo_ID", "Bar")
)
now when I look at the DataFrame schema, spark has somehow incorrectly inferred nullability:
>>> df.schema
...
StructType([StructField('Foo_ID', LongType(), True), StructField('Bar', TimestampType(), True)])
I know the values should not be null and I want the schema to reflect that by imposing a not null constraint on the columns. In case I try to load a table that actually does contain a null I want to see a big fat error message, rather than silently switching. Is there a clean way to do this? I came up with the following workaround, but it doesn't feel very nice:
my_schema = StructType([StructField('Foo_ID', LongType(), False), StructField('Bar', TimestampType(), False)])
df2 = spark.createDataFrame(df.rdd, schema=my_schema)
Is there a better way?
CodePudding user response:
If the table was created with the constraint, then Spark will infer nullability:
CREATE OR REPLACE TABLE delta.`/tmp/delta/table` (
foo_id LONG NOT NULL,
bar TIMESTAMP
) USING DELTA;
val df = spark.read.load("/tmp/delta/table")
df.printSchema
root
|-- foo_id: long (nullable = false)
|-- bar: timestamp (nullable = true)
Apparently it wasn't. But you can always change it:
CREATE OR REPLACE TABLE delta.`/tmp/delta/table` (
foo_id LONG,
bar TIMESTAMP
) USING DELTA;
ALTER TABLE delta.`/tmp/delta/table` CHANGE COLUMN foo_id foo_id LONG NOT NULL;
ALTER TABLE delta.`/tmp/delta/table` CHANGE COLUMN bar bar TIMESTAMP NOT NULL;
val df = spark.read.load("/tmp/delta/table")
df.printSchema
root
|-- foo_id: long (nullable = false)
|-- bar: timestamp (nullable = false)
Or, if you don't want or cannot change it, then what you came up with is not a workaround, it is a solution. You just explicitly specify a schema you want to read data with.