I am using Paramiko library to stream data from SFTP to an S3 bucket with a python 3.8 lambda in AWS. The script is standard, downloading whole files smaller than 6MB, and for larger files doing multi chunk uploads with a chunk size of roughly 6 MB. However, but I've noticed that the speed is very slow, about 200KB/sec for a ~47MB file, where based online this should at least be 2MB/sec(which is still considered fairly slow). There will not be many files over 1GB but the 15 minute lambda timeout will be reached at this rate with a 200MB file. I am using a VPC but I am not familiar with the configuration and we have a team that works with that. Is there a reason this is so incredibly slow?
Script:
def open_ftp_connection(self):
"""
Opens ftp connection and returns connection object
"""
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(self.host_name,22)
#self.trans = paramiko.Transport((hostname, port))
transport.window_size = 134217727
transport.use_compression()
print("A")
except Exception as e:
return 'conn_error'
try:
transport.connect(username=self.ftp_username, password=self.ftp_password)
print("B")
#transport.exec_command("ls")
print("C")
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
print("D")
return ftp_connection
def transfer_chunk_from_ftp_to_s3(self,ftp_file,s3_connection,multipart_upload,bucket_name,ftp_file_path,s3_file_path, part_number,chunk_size):
start_time = time.time()
chunk = ftp_file.read(int(chunk_size))
part = s3_connection.upload_part(Bucket=bucket_name,
Key=s3_file_path, PartNumber=part_number, UploadId=multipart_upload["UploadId"],Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_ftp_to_s3(self,
bucket_name, ftp_file_path, s3_file_path, ftp_username, ftp_password, chunk_size,ftp_connection):
# ftp_connection = self.open_ftp_connection(
# FTP_HOST, ftp_username, ftp_password
# )
ftp_file = ftp_connection.file(ftp_file_path, "r")
s3_connection = boto3.client("s3")
ftp_file_size = ftp_file._get_size()
try:
s3_file = s3_connection.head_object(Bucket=bucket_name, Key=s3_file_path)
if s3_file["ContentLength"] == ftp_file_size:
print("File Already Exists in S3 bucket")
ftp_file.close()
return
except Exception as e:
pass
logger.info("file size: " str(ftp_file_size))
logger.info("chunk size: " str(chunk_size))
if ftp_file_size <= int(chunk_size):
# upload file in one go
print("Transferring complete File from FTP to S3...")
ftp_file_data = ftp_file.read()
ftp_file_data_bytes = io.BytesIO(ftp_file_data)
s3_connection.upload_fileobj(ftp_file_data_bytes, bucket_name, s3_file_path)
print("Successfully Transferred file from FTP to S3!")
ftp_file.close()
else:
print("Transferring File from FTP to S3 in chunks...")
# upload file in chunks
chunk_count = int(math.ceil(ftp_file_size / float(chunk_size)))
multipart_upload = s3_connection.create_multipart_upload(
Bucket=bucket_name, Key=s3_file_path
)
logger.info("chunk count: " str(chunk_count))
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i 1))
part = self.transfer_chunk_from_ftp_to_s3(
ftp_file,
s3_connection,
multipart_upload,
bucket_name,
ftp_file_path,
s3_file_path,
i 1,
chunk_size
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=s3_file_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print("All chunks Transferred to S3 bucket! File Transfer successful!")
ftp_file.close()
The code above is all called by this:
self.transfer_file_from_ftp_to_s3(self.s3_bucket,self.ftp_full_file_path, s3_file_path, self.ftp_username, self.ftp_password, CHUNK_SIZE,ftp_connection)
And here is the output:
speed is 222 kb/s total seconds taken 27.759796857833862
Chunk 1 Transferred Successfully!
Transferring chunk 2...
speed is 214 kb/s total seconds taken 28.721262216567993
Chunk 2 Transferred Successfully!
Transferring chunk 3...
speed is 193 kb/s total seconds taken 31.968283653259277
Chunk 3 Transferred Successfully!
Transferring chunk 4...
speed is 196 kb/s total seconds taken 31.360466480255127
Chunk 4 Transferred Successfully!
Transferring chunk 5...
speed is 216 kb/s total seconds taken 28.545111417770386
Chunk 5 Transferred Successfully!
Transferring chunk 6...
speed is 218 kb/s total seconds taken 28.293278217315674
Chunk 6 Transferred Successfully!
Transferring chunk 7...
speed is 217 kb/s total seconds taken 28.43106746673584
Chunk 7 Transferred Successfully!
Transferring chunk 8...
speed is 200 kb/s total seconds taken 30.775285482406616
Chunk 8 Transferred Successfully!
All chunks Transferred to S3 bucket! File Transfer successful!
edit:
adding ftp_file.prefetch() in the transfer_file_to_S3 function seemed to dramatically increase speed for the ~47MB file from 202KB to 2MB a second. However, for a 1GB file using the same chunk size, it starts out using 2MB a second, but by chunk 10, the speed decreases back to 202KB a second.
CodePudding user response:
My solution to the problem to use paramiko readv(), which reads a list of chunks and saves time because it doesn't use seek. I also added multithreading with the method above to download several chunks at once, then use the multipart upload. Doing readv alone sped it up to 2-3MB a sec, with higher speeds hitting 10MB a sec, and the multiple threads provided the same speeds, but processed different parts of the file simultaneously. This allowed a 1GB file to be read in less than 6 minutes, whereas the original would've only allowed a 200MB in a 15 minute timeframe. I'll also add prefetch and the other fixes mentioned in the comments were not used, as readv uses prefetch on its own, and prefetch doesnt help with large files