Home > Mobile >  Sorting a big file using its chunks
Sorting a big file using its chunks

Time:10-10

Suppose we want to sort a file that has 40000 rows around a column=X. Let us also assume that same values are widespread across the table, so that rows with same value in column=X are found not only in the top 1000 rows. Now if we read file by chunks and consider only 1000 rows, we might mess the other rows with same value found in column=X if we are to sort again the table around that column. So how we can solve this issue please? No code is needed since no data is available, but please I am looking for your opinion on the matter? Should we go with merge sort by giving each chunk to a merge sort algorithm parallelly and then recombine the results? I don't see that there is a way to do that with pandas, but I am not sure?

import pandas as pd
chunk_size = 1000
batch_no = 1
for chunk in pd.read_csv('data.csv', chunksize=chunk_size):
    chunk.sort_values(by='X', inplace=True)
    chunk.to_csv('data'  str(batch_no)   '.csv', index=False)
    batch_no  =1

CodePudding user response:

You need to merge the sorted csv files, luckily Python provides a function for it. Use it as below:

from operator import itemgetter

import pandas as pd
import numpy as np
import csv
import heapq

# generate test data
test_data = pd.DataFrame(data=[[f"label{i}", val] for i, val in enumerate(np.random.uniform(size=40000))],
                         columns=["label", "X"])
test_data.to_csv("data.csv", index=False)

# read and sort each chunk
chunk_size = 1000
file_names = []
for batch_no, chunk in enumerate(pd.read_csv("data.csv", chunksize=chunk_size), 1):
    chunk.sort_values(by="X", inplace=True)
    file_name = f"data_{batch_no}.csv"
    chunk.to_csv(file_name, index=False)
    file_names.append(file_name)

# merge the chunks
chunks = [csv.DictReader(open(file_name)) for file_name in file_names]
with open("data_sorted.csv", "w") as outfile:
    field_names = ["label", "X"]
    writer = csv.DictWriter(outfile, fieldnames=field_names)
    writer.writeheader()
    for row in heapq.merge(*chunks, key=itemgetter("X")):
        writer.writerow(row)

From the documentation on heapq.merge:

Merge multiple sorted inputs into a single sorted output (for example, merge timestamped entries from multiple log files). Returns an iterator over the sorted values.

Similar to sorted(itertools.chain(*iterables)) but returns an iterable, does not pull the data into memory all at once, and assumes that each of the input streams is already sorted (smallest to largest).

So using as you can read in the above quote (emphasis mine) using heapq.merge won't load all data into memory. Is also worth noting that the complexity of this function is O(n) where n is the size of the whole data. Therefore the overall sorting algorithm is O(nlogn)

  • Related