I have a file of around 2 GB in a S3 folder which contains header and trailer of different length and actual data is of different length. I need to copy this file to another location in S3 programmatically after removing header and trailer. Can anyone help me with this ?
File format (say file name abc.txt)=>
001|20210930|abc.txt
12345|abcsd|prsdf|20210930|10.0|50
12346|sdfgsd|dfg|20210930|20.0|100
12347|dfgfrg|dfg|20210930|30.0|200
009|3
I tried loading file from S3 in pandas but it got failed because of memory error. So can't use pandas here.
I tried using boto3 library and used
obj.get()['Body'].read()
but how to remove header and trailer from this data and then write back to the file in S3?
Is there any other effective way?
CodePudding user response:
I'll assume you have some functions is_header(line)
and is_trailer(line)
that can tell you whether the line is a header or trailier, respectively. Then here's how you could stream the file from S3 and save it back.
import boto3
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/def.txt"
r = s3.get_object(Bucket=bucket, Key=key)
sb = r["StreamingBody"]
content = [line for line in sb.iter_lines() if not is_header(line) and not is_trailer(line)]
content = b"".join(content)
r = s3.put_object(Bucket=bucket, Key=new_key, Bytes=content)
Stream Data to Avoid Out of Memory Errors
The above code assumes that the entire file can fit into memory, which I assume it can because it's only 2 GB. If not you'll need to use Multipart Uploads.
Here's one way to do that using a TransferManager
from typing import Optional
import boto3
from boto3.s3.transfer import TransferConfig
import botocore
MB = 1024*1024
class FileNoHeader:
"""Wrapper for a botocore StreamingBody to filter headers/trailers"""
def __init__(self, stream: botocore.response.StreamingBody):
self.stream = stream
self.first_line = True
self.line_generator = self.stream.iter_lines()
def read(self, size: Optional[int] = None) -> bytes:
"""Wrap StreamingBody.iter_lines to read line-by-line while making it look like a fileobj
Parameters
----------
size: int, optional
How much data to read. This is a minimum amount because we are using
StreamingBody.iter_lines to read the file line by line, we can only return
whole lines. If `None`, the default, read the entire file.
This parameter is for compatibilty with the read() method of a file-like object
"""
data = []
amt = 0
line = b""
while size is None or amt < size:
try:
line = next(self.line_generator)
except StopIteration:
if line:
amt = len(line)
data.append(line)
break
if self.is_header(line) or self.is_trailer(line):
line = b""
continue
amt = len(line)
data.append(line)
return b"\n".join(data)
def close(self):
"""Close the underlying StreamingBody"""
self.stream.close()
def is_header(self, line):
# TODO: implement your logic
# right now just skips the first line
if self.first_line:
self.first_line = False
return True
return self.first_line
def is_trailer(self, line):
# TODO: implement your logic
return False
## Usage
config = TransferConfig(multipart_chunksize=1*MB)
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/abc_no_header.txt"
r = s3.get_object(Bucket=bucket, Key=key)
streaming_body = r["Body"]
data_stream = FileNoHeader(streaming_body)
def tcback(bytes_transferred):
print(f"{bytes_transferred} bytes transferred")
s3.upload_fileobj(
data_stream,
bucket,
new_key,
Config=config,
Callback=tcback
)
Sidebar: AWS Lambda
If you are using AWS Lambda functions, you can have up to 10 GB of memory. You can set the memory in the AWS Console or using the API. Here're the docs for boto3 and the AWS CLI v2.