Home > Software design >  read pyspark json files and concat
read pyspark json files and concat

Time:12-16

I have 999 gz files located in s3 bucket. I wanted to read them all and convert pyspark dataframe into pandas dataframe, it was impossible due to large files. I am trying to take a different approach - reading each single /gz file THEN convert it to pandas df - reduce number of columns and then concatenate it into one big pandas df.

spark_df = spark.read.json(f"s3a://my_bucket/part-00000.gz")

part-000000.gz - this is zipped json, 0000 is the first one and 00999 is the last one to read. COuld you please help me unpack them all and later on concatenate pandas df.

Logic:

  1. Read all json files:

    spark_df = spark.read.json(f"s3a://my_bucket/part-00{}.gz")

  2. convert to pandas

    pandas_df = spark_df.toPandas()

  3. reduce columns (only few needed column)

    pandas_df = pandas_df[["col1","col2","col3"]]

  4. merge all the 999 pandas df into one full_df = pd.concat(for loop, to go through all the pandas dataframes)

This is the logic in my head, but I have difficulties to code it.

EDIT: I started writing the code but it does not show me pandas_df:

for i in range(10,11):
    df_to_predict = spark.read.json(f"s3a://my_bucket/company_v20_dl/part-000{i}.gz")
    df_to_predict = df_to_predict.select('id','summary', 'website')
    df_to_predict = df_to_predict.withColumn('text', lower(col('summary')))
    df_to_predict = df_to_predict.select('id','text', 'website')
    df_to_predict = df_to_predict.withColumn("text_length", length("text"))
    df_to_predict.show()
    pandas_df = df_to_predict.toPandas()
    pandas_df.head()

Also I've notice this solution will be faulty for part00001 / part00100 etc <- range does not "fill up" with zeros.

CodePudding user response:

here is a possible solution:

s3 = session.resource('s3')
my_bucket = s3.Bucket("bucket-name")
for obj in my_bucket.objects.filter(Prefix=f"/output/"): ##path to folder where parts are located.
    do something with obj
    my_bucket.download_file(obj.key,f'download/filename.gz')

refer to Listing contents of a bucket with boto3

convert them all to a dataframe and append to the root dataframe. I hope this helps

CodePudding user response:

Your end dataframe is 200k rows * 4 cols * 999 files ~= 200M * 4 cols which is still large dataset for Pandas.

Now that Pyspark can run Pandas code (distributed), unless there is any specific reason, I would recommend keep it in Pyspark dataframe or convert it to Pandas-on-Spark dataframe in case you need specific Pandas opertaions.

Pseudo code

df = spark.read.json(f"s3a://my_bucket")  # Read all, Spark will distribute your data and apply operations.
df = df.select('col1', 'col2', 'col3')
# df = df.withColumn('something', ...)

# Convert to Pandas-on-Spark dataframe which can apply Pandas operation but is distributed
pdf = df.to_pandas_on_spark()
pdf.groupby('col1').min()

Ref: https://spark.apache.org/docs/3.2.0/api/python/user_guide/pandas_on_spark/pandas_pyspark.html

  • Related