Home > Blockchain >  Python: Stream gzip files from s3
Python: Stream gzip files from s3

Time:03-31

I have files in s3 as gzip chunks, thus I have to read the data continuously and cant read random ones. I always have to start with the first file.

For example lets say I have 3 gzip file in s3, f1.gz, f2.gz, f3.gz. If I download all locally, I can do cat * | gzip -d. If I do cat f2.gz | gzip -d, it will fail with gzip: stdin: not in gzip format.

How can I stream these data from s3 using python? I saw smart-open and it has the ability to decompress gz files with

from smart_open import smart_open, open

with open(path, compression='.gz') as f:
    for line in f:
        print(line.strip())

Where path is the path for f1.gz. This works until it hits the end of the file, where it will abort. Same thing will happen locally, if I do cat f1.gz | gzip -d, it will error with gzip: stdin: unexpected end of file when it hits the end.

Is there a way to make it stream the files continuously using python?

This one will not abort, and can iterate through f1.gz, f2.gz and f3.gz

with open(path, 'rb', compression='disable') as f:
    for line in f:
        print(line.strip(), end="")

but the output are just bytes. I was thinking it will work by doing python test.py | gzip -d, with the above code but I get an error gzip: stdin: not in gzip format. Is there a way to have python print using smart-open that gzip can read?

CodePudding user response:

For example lets say I have 3 gzip file in s3, f1.gz, f2.gz, f3.gz. If I download all locally, I can do cat * | gzip -d.

One idea would be to make a file object to implement this. The file object reads from one filehandle, exhausts it, reads from the next one, exhausts it, etc. This is similar to how cat works internally.

The handy thing about this is that it does the same thing as concatenating all of your files, without the memory use of reading in all of your files at the same time.

Once you have the combined file object wrapper, you can pass it to Python's gzip module to decompress the file.

Examples:

import gzip

class ConcatFileWrapper:
    def __init__(self, files):
        self.files = iter(files)
        self.current_file = next(self.files)
    def read(self, *args):
        ret = self.current_file.read(*args)
        if len(ret) == 0:
            # EOF
            # Optional: close self.current_file here
            # self.current_file.close()
            # Advance to next file and try again
            try:
                self.current_file = next(self.files)
            except StopIteration:
                # Out of files
                # Return an empty string
                return ret
            # Recurse and try again
            return self.read(*args)
        return ret
    def write(self):
        raise NotImplementedError()

filenames = ["xaa", "xab", "xac", "xad"]
filehandles = [open(f, "rb") for f in filenames]
wrapper = ConcatFileWrapper(filehandles)

with gzip.open(wrapper) as gf:
    for line in gf:
        print(line)

# Close all files
[f.close() for f in filehandles]

Here's how I tested this:

I created a file to test this through the following commands.

Create a file with the contents 1 thru 1000.

$ seq 1 1000 > foo

Compress it.

$ gzip foo

Split the file. This produces four files named xaa-xad.

$ split -b 500 foo.gz

Run the above Python file on it, and it should print out 1 - 1000.

Edit: extra remark about lazy-opening the files

If you have a huge number of files, you might want to open only one file at a time. Here's an example:

def open_files(filenames):
    for filename in filenames:
        # Note: this will leak file handles unless you uncomment the code above that closes the file handles again.
        yield open(filename, "rb")
  • Related