Home > Enterprise >  How to EFFICIENTLY upload a a pyspark dataframe as a zipped csv or parquet file(similiar to.gz forma
How to EFFICIENTLY upload a a pyspark dataframe as a zipped csv or parquet file(similiar to.gz forma

Time:05-14

I have 130 GB csv.gz file in S3 that was loaded using a parallel unload from redshift to S3. Since it contains multiple files i wanted to reduce the number of files so that its easier to read for my ML model(using sklearn).

I have managed to convert multiple from from S3 to a spark dataframe (called spark_df) using : spark_df1=spark.read.csv(path,header=False,schema=schema)

spark_df1 contains 100s of columns (features) and is my time series inference data for millions of customers IDs. Since it is a time series data, i want to make sure that a the data points of 'customerID' should be present in same output file as I would be reading each partition file as a chunk. I want to unload this data back into S3.I don't mind smaller partition of data but each partitioned file SHOULD have the entire time series data of a single customer. in other words one customer's data cannot be in 2 files.

current code: datasink3=spark_df1.repartition(1).write.format("parquet").save(destination_path)

However, this takes forever to run and the ouput is a single file and it is not even zipped. I also tried using ".coalesce(1)" instead of ".repartition(1)" but it was slower in my case.

CodePudding user response:

You can partition it using the customerID:

spark_df1.partitionBy("customerID") \
         .write.format("parquet") \
         .save(destination_path)

You can read more about it here: https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/

CodePudding user response:

This code worked and the time to run reduced to 1/5 of the original result. Only thing to note is that make sure that the load is split equally amongst the nodes (in my case i had to make sure that each customer id had ~same number of rows)

spark_df1.repartition("customerID").write.partitionBy("customerID").format("csv").option("compression","gzip").save(destination_path)

  • Related