Home > Mobile >  PySpark dataframe not getting saved in Hive due to file format mismatch
PySpark dataframe not getting saved in Hive due to file format mismatch

Time:03-25

I want to write the streaming data from kafka topic to hive table.
I am able to create dataframes by reading kafka topic, but the data is not getting written to Hive Table due to file-format mismatch. I have specified dataframe.format("parquet") and the hive table is created with stored as parquet.
Below is the code snippet:

def hive_write_batch_data(data, batchId):
    data.write.format("parquet").mode("append").saveAsTable(table)

def write_to_hive(data,kafka_sink_name):
    global table
    table = kafka_sink_name
    data.select(col("key"),col("value"),col("offset")) \
        .writeStream.foreachBatch(hive_write_batch_data) \
        .start().awaitTermination()

if __name__ == '__main__':
    kafka_sink_name = sys.argv[1]
    kafka_config = {
                     ....
                     ..
                   }
    spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
    df = spark.readStream \
        .format("kafka") \
        .options(**kafka_config) \
        .load()
    
    df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
    write_to_hive(df1,kafka_sink_name)

Hive table is created as Parquet:

CREATE TABLE test.kafka_test(
  key string,
  value string,
  offset bigint)
STORED AS PARQUET;

It is giving me the Error:

pyspark.sql.utils.AnalysisException: "The format of the existing table test.kafka_test is `HiveFileFormat`. It doesn\'t match the specified format `ParquetFileFormat`.;"

How do I write the dataframe to hive table ?

CodePudding user response:

I dropped the hive table, and ran the Spark-streaming job. Table got created with the correct format.

  • Related