Conditions :
- Code should read the messages from kafka topics and write it as parquet file in S3
Here I am using Pyspark sql to write Kafka and i am able write successfully as JSON file to s3 sink .
*Working code : *
Spark -2.4.4 package - org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
spark = SparkSession\
.builder \
.appName("Python-EMR-KAFKA") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.shuffle.compress", "true") \
.config("spark.io.compression.codec", "snappy") \
.config("spark.scheduler.mode", "FAIR") \
.config("spark.speculation", "false") \
.getOrCreate()
final_df = spark.read.format("parquet").load("s3://sample-bucket/prod/cleansed/s4/")
final_df.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "abc-qa.129.abc.com:9500") \
.option("topic", "v1-pdr-auw2-ems-c5aecf6e-ndf") \
.option("kafka.linger.ms", 10) \
.option("kafka.retries", 3) \
.save()
spark.stop()
Now i want to store it as a parquet file in s3 instead of json . I found the below solution from internet but my data is in TB and it take lot of time for conversion to avro .
df = spark.read.json("src/main/resources/zipcodes.json")
//convert to avro
df.write.format("avro").save("/tmp/avro/zipcodes.avro")
CodePudding user response:
You don't need to convert from/to Avro. Spark's dataframe writer defaults to parquet, so remove any other .format
function and just write
to get Parquet files
However, it appears final_df
is already read as a Parquet file in S3 and you're never modifying it, so there should be no need to duplicate it...
read the messages from kafka topics
You're currently reading from S3, not Kafka. I'm not sure I see the need for Kafka here when your data is already in S3
CodePudding user response:
Spark doesn’t need any additional packages or libraries to use Parquet as it by default provides with Spark
//read json file into data frame
val df = spark.read.json("src/main/resources/file.json")
df.write.parquet("://S3_Path/file.parquet")