Home > Back-end >  Write parquet files to S3 Sink Using Kafka
Write parquet files to S3 Sink Using Kafka

Time:04-24

Conditions :

  1. Code should read the messages from kafka topics and write it as parquet file in S3

Here I am using Pyspark sql to write Kafka and i am able write successfully as JSON file to s3 sink .

*Working code : *

Spark -2.4.4 package - org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4

    spark = SparkSession\
        .builder \
        .appName("Python-EMR-KAFKA") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.shuffle.compress", "true") \
        .config("spark.io.compression.codec", "snappy") \
        .config("spark.scheduler.mode", "FAIR") \
        .config("spark.speculation", "false") \
        .getOrCreate()

final_df = spark.read.format("parquet").load("s3://sample-bucket/prod/cleansed/s4/")

    final_df.selectExpr("to_json(struct(*)) AS value") \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "abc-qa.129.abc.com:9500") \
        .option("topic", "v1-pdr-auw2-ems-c5aecf6e-ndf") \
        .option("kafka.linger.ms", 10) \
        .option("kafka.retries", 3) \
        .save()
    spark.stop()

Now i want to store it as a parquet file in s3 instead of json . I found the below solution from internet but my data is in TB and it take lot of time for conversion to avro .

 df = spark.read.json("src/main/resources/zipcodes.json")
 
  //convert to avro
  df.write.format("avro").save("/tmp/avro/zipcodes.avro")

CodePudding user response:

You don't need to convert from/to Avro. Spark's dataframe writer defaults to parquet, so remove any other .format function and just write to get Parquet files

However, it appears final_df is already read as a Parquet file in S3 and you're never modifying it, so there should be no need to duplicate it...

read the messages from kafka topics

You're currently reading from S3, not Kafka. I'm not sure I see the need for Kafka here when your data is already in S3

CodePudding user response:

Spark doesn’t need any additional packages or libraries to use Parquet as it comes with Spark by default

//read json file into data frame
val df = spark.read.json("src/main/resources/file.json")

df.write.parquet("://S3_Path/file.parquet")
  • Related