I want to convert my CSV file to a parquet file. My code below causes my kernel to be KILLED regardless of the chunksize
parameter. I do not know the number of rows x columns in my file, but I suspect that I have many columns.
What is the ideal solution?
With Pandas:
import pandas as pd
import dask.dataframe as dd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = "kipan_exon.csv.gz"
parquet_file = "kipan_exon.csv.gz"
chunksize = 1000000
df = pd.read_csv(csv_file, sep="\t", chunksize=chunksize, low_memory=False, compression="gzip")
for i, chunk in enumerate(df):
print("Chunk", i)
if i == 0:
parquet_schema = pa.Table.from_pandas(df=chunk).schema
parquet_writer = pd.ParquetWriter(parquet_file, parquet_schema, compression="gzip")
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
With dask:
df = dd.read_csv(csv_file, sep="\t", compression="gzip", blocksize=None)
df = df.repartition(partition_size="100MB")
df.to_parquet(parquet_file, write_index=False)
CodePudding user response:
Another (more recent) solution is to use a LazyFrame
approach in polars
:
csv_file = "kipan_exon.csv" # this doesn't work with compressed files right now
parquet_file = "kipan_exon.parquet" # @MichaelDelgado's comment re: same value as `csv_file`
from polars import scan_csv
ldf = scan_csv(csv_file)
ldf.sink_parquet(parquet_file)
This should work well in memory-constrained situations since the data is not loaded fully, but streamed to the parquet file.
CodePudding user response:
When using dask
for csv to parquet conversion, I'd recommend avoiding .repartition
. It introduces additional data shuffling that can strain workers and the scheduler. The simpler approach would look like this:
csv_file = "kipan_exon.csv.gz"
parquet_file = "kipan_exon.parquet" # @MichaelDelgado's comment re: same value as `csv_file`
from dask.dataframe import read_csv
df = read_csv(csv_file, sep="\t", compression="gzip")
df.to_parquet(parquet_file, write_index=False)