I have files in s3 as gzip chunks, thus I have to read the data continuously and cant read random ones. I always have to start with the first file.
For example lets say I have 3 gzip file in s3, f1.gz
, f2.gz
, f3.gz
. If I download all locally, I can do cat * | gzip -d
. If I do cat f2.gz | gzip -d
, it will fail with gzip: stdin: not in gzip format
.
How can I stream these data from s3 using python? I saw smart-open and it has the ability to decompress gz files with
from smart_open import smart_open, open
with open(path, compression='.gz') as f:
for line in f:
print(line.strip())
Where path is the path for f1.gz
. This works until it hits the end of the file, where it will abort. Same thing will happen locally, if I do cat f1.gz | gzip -d
, it will error with gzip: stdin: unexpected end of file
when it hits the end.
Is there a way to make it stream the files continuously using python?
This one will not abort, and can iterate through f1.gz
, f2.gz
and f3.gz
with open(path, 'rb', compression='disable') as f:
for line in f:
print(line.strip(), end="")
but the output are just bytes. I was thinking it will work by doing python test.py | gzip -d
, with the above code but I get an error gzip: stdin: not in gzip format
. Is there a way to have python print using smart-open that gzip can read?
CodePudding user response:
For example lets say I have 3 gzip file in s3,
f1.gz
,f2.gz
,f3.gz
. If I download all locally, I can docat * | gzip -d
.
One idea would be to make a file object to implement this. The file object reads from one filehandle, exhausts it, reads from the next one, exhausts it, etc. This is similar to how cat
works internally.
The handy thing about this is that it does the same thing as concatenating all of your files, without the memory use of reading in all of your files at the same time.
Once you have the combined file object wrapper, you can pass it to Python's gzip
module to decompress the file.
Examples:
import gzip
class ConcatFileWrapper:
def __init__(self, files):
self.files = iter(files)
self.current_file = next(self.files)
def read(self, *args):
ret = self.current_file.read(*args)
if len(ret) == 0:
# EOF
# Optional: close self.current_file here
# self.current_file.close()
# Advance to next file and try again
try:
self.current_file = next(self.files)
except StopIteration:
# Out of files
# Return an empty string
return ret
# Recurse and try again
return self.read(*args)
return ret
def write(self):
raise NotImplementedError()
filenames = ["xaa", "xab", "xac", "xad"]
filehandles = [open(f, "rb") for f in filenames]
wrapper = ConcatFileWrapper(filehandles)
with gzip.open(wrapper) as gf:
for line in gf:
print(line)
# Close all files
[f.close() for f in filehandles]
Here's how I tested this:
I created a file to test this through the following commands.
Create a file with the contents 1 thru 1000.
$ seq 1 1000 > foo
Compress it.
$ gzip foo
Split the file. This produces four files named xaa-xad.
$ split -b 500 foo.gz
Run the above Python file on it, and it should print out 1 - 1000.
Edit: extra remark about lazy-opening the files
If you have a huge number of files, you might want to open only one file at a time. Here's an example:
def open_files(filenames):
for filename in filenames:
# Note: this will leak file handles unless you uncomment the code above that closes the file handles again.
yield open(filename, "rb")