Recently I faced a task of transferring large files (> 0.5 gb) to S3 storage via SFTP. As I understood from boto3 manual, I should be using multipart upload. I came across a good tutorial and with some minor changes tried the code for myself. So, long story short - it works, but speed is ridiculous (~150 kb/s) which in my case (large files) especially hurts. As I understood paramiko might be a bottleneck but my experiments with prefetching and varying buffer size didn't made that much sense (maximum result I've got was around 1500 kb/s for the first chunk (with prefetching on) and then dropdown to 250-300 kb/s for other chunks on ~1.5 gb file). So, since I run out of ideas, any tips and thoughts of what to try next are very appreciated, thanks in advance! Here's my code:
AWS session wrapper
aws.py
import boto3
import os
def aws_session(region_name='us-east-1'):
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
region_name=region_name)
Main script
stream.py
import paramiko
import math
import time
from aws import aws_session
from dotenv import load_dotenv
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(sftp_host, sftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=sftp_username, password=sftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk_size):
start_time = time.time()
chunk = sftp_file.read(int(chunk_size))
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)
s3_connection = aws_session().client('s3')
sftp_file_size = sftp_file._get_size()
print('file size: ', sftp_file_size)
chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
print('amount of chunks: ', chunk_count)
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i 1))
part = transfer_chunk_from_sftp_to_s3(
sftp_file,
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
i 1,
chunk_size,
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
sftp_file.close()
if __name__ == '__main__':
load_dotenv()
bucket_name=''
sftp_host=''
sftp_port=int('22')
sftp_path=''
aws_s3_path=''
sftp_username=''
sftp_password=''
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password
)
PS: The scenario I'm using this script is following: i need to accept incoming large files (from anywhere) to process them through pipeline, but processing pipeline is isolated and therefore cannot be reached from outside world (all the communication involving data is done via S3 storage). The scheme of the solution i came was a small machine in the middle that would accept sftp credentials and would "stream" (or upload chunk by chunk) file to S3 storage from where pipeline would catch it and do the job.
CodePudding user response:
There are three issues at play here:
You're counting the time to download and upload a chunk as one transfer, so that if it takes 100 seconds to download, and 10 seconds to upload 1 mb, you report 10kb/s (1mb / 110 seconds) instead of 11kb/s followed by 103kb/s. I mention this because it's hiding the second issue.
Paramiko has some issues with sustained transfers for some people. This bug on paramiko has more details.
And, on top of these issues, you're transferring down one chunk, then turning around and uploading. In most environments, this is wasteful, even if things were operating well, this generally means that your download is spending 1/2 of it's time waiting for the upload.
You can fix all the issues:
import paramiko
import boto3
import multiprocessing
import time
# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
def __init__(self):
self._start = time.time()
self._end = None
def start(self):
self._start = time.time()
def end(self, bytes, desc):
self._end = time.time()
secs = self._end - self._start
if secs > 0:
print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")
class FastTransport(paramiko.Transport):
# Correct issues with window size, see paramiko issue 175
def __init__(self, sock):
super(FastTransport, self).__init__(sock)
self.window_size = 2147483647
self.packetizer.REKEY_BYTES = pow(2, 40)
self.packetizer.REKEY_PACKETS = pow(2, 40)
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
client = paramiko.SSHClient()
client.load_system_host_keys()
transport = FastTransport((sftp_host, sftp_port))
# Not necessary, but here for testing purposes, support either
# password or private key auth
if sftp_password is not None:
transport.connect(username=sftp_username, password=sftp_password)
else:
pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
transport.connect(username=sftp_username, pkey=pkey)
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
sftp_file = sftp_connection.file(sftp_path, "rb")
# Enable pipelined mode, see paramiko issue 175
sftp_file.set_pipelined()
# Allow the transfer to fill up data in a background thread
sftp_file.prefetch()
chunk_size = 8388608
tracker = SpeedTracker()
num = 0
while True:
# Download one chunk
tracker.start()
chunk = sftp_file.read(chunk_size)
if len(chunk) == 0:
# All done, time to stop work
queue.put(None)
sftp_file.close()
break
# Send the chunk off to the reader process
num = 1
tracker.end(len(chunk), f"Downloaded chunk #{num}")
queue.put(chunk)
def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk):
# Upload one chunk to S3
tracker = SpeedTracker()
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, sftp_key):
# Start a worker process to get the data from SFTP
queue = multiprocessing.Queue(10)
proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
proc.start()
# And start reading from that worker to upload its results
s3_connection = boto3.client('s3')
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
while True:
chunk = queue.get()
if chunk is None:
break
part = send_chunk_to_s3(
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
len(parts) 1,
chunk,
)
parts.append(part)
# All done, clean up and finalize the multipart upload
proc.join()
part_info = {"Parts": parts}
resp = s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print(resp)
if __name__ == '__main__':
sftp_host='TODO'
sftp_port=int('22')
sftp_path='TODO'
sftp_username='TODO'
sftp_password='TODO'
sftp_key=None
bucket_name='TODO'
aws_s3_path='TODO'
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password,
sftp_key=sftp_key
)
Tested on my local machine, I saw download and upload speeds of around 40 MB/s, which is certainly better without the bug fixes.