Home > Software design >  Yet another sftp to s3 file uploading question
Yet another sftp to s3 file uploading question

Time:12-16

Recently I faced a task of transferring large files (> 0.5 gb) to S3 storage via SFTP. As I understood from boto3 manual, I should be using multipart upload. I came across a good tutorial and with some minor changes tried the code for myself. So, long story short - it works, but speed is ridiculous (~150 kb/s) which in my case (large files) especially hurts. As I understood paramiko might be a bottleneck but my experiments with prefetching and varying buffer size didn't made that much sense (maximum result I've got was around 1500 kb/s for the first chunk (with prefetching on) and then dropdown to 250-300 kb/s for other chunks on ~1.5 gb file). So, since I run out of ideas, any tips and thoughts of what to try next are very appreciated, thanks in advance! Here's my code:

AWS session wrapper

aws.py

import boto3
import os

def aws_session(region_name='us-east-1'):
    AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
    AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
    return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
                                 aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
                                 region_name=region_name)

Main script

stream.py

import paramiko
import math
import time
from aws import aws_session

from dotenv import load_dotenv

def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    try:
        transport = paramiko.Transport(sftp_host, sftp_port)
    except Exception as e:
        return 'conn_error'
    try:
        transport.connect(username=sftp_username, password=sftp_password)
    except Exception as identifier:
        return 'auth_error'
    ftp_connection = paramiko.SFTPClient.from_transport(transport)
    return ftp_connection


def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
                                  s3_file_path, part_number, chunk_size):
    start_time = time.time()
    chunk = sftp_file.read(int(chunk_size))
    part = s3_connection.upload_part(
        Bucket=bucket_name,
        Key=s3_file_path,
        PartNumber=part_number,
        UploadId=multipart_upload["UploadId"],
        Body=chunk)
    end_time = time.time()
    total_seconds = end_time - start_time
    print(
        "speed is {} kb/s total seconds taken {}".format(
           math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
        )
    )
    part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
    return part_output


def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path, 
                                  aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
    
    sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
    sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)    
    s3_connection = aws_session().client('s3')
    sftp_file_size = sftp_file._get_size()
    print('file size: ', sftp_file_size)
    chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
    print('amount of chunks: ', chunk_count)
    multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
    parts = []
    for i in range(chunk_count):
        print("Transferring chunk {}...".format(i   1))
        part = transfer_chunk_from_sftp_to_s3(
                  sftp_file,
                  s3_connection,
                  multipart_upload,
                  bucket_name,
                  aws_s3_path,
                  i   1,
                  chunk_size,
                )
        parts.append(part)
        print("Chunk {} Transferred Successfully!".format(i   1))

    part_info = {"Parts": parts}
    s3_connection.complete_multipart_upload(
        Bucket=bucket_name,
        Key=aws_s3_path,
        UploadId=multipart_upload["UploadId"],
        MultipartUpload=part_info,
    )
    sftp_file.close()

if __name__ == '__main__':

    load_dotenv()

    bucket_name=''
    sftp_host=''
    sftp_port=int('22')
    sftp_path=''
    aws_s3_path=''
    sftp_username=''
    sftp_password=''


    transfer_file_from_sftp_to_s3(
        bucket_name=bucket_name,
        sftp_host=sftp_host,
        sftp_port=22,
        sftp_path=sftp_path,
        aws_s3_path=aws_s3_path,
        sftp_username=sftp_username,
        sftp_password=sftp_password
    )

PS: The scenario I'm using this script is following: i need to accept incoming large files (from anywhere) to process them through pipeline, but processing pipeline is isolated and therefore cannot be reached from outside world (all the communication involving data is done via S3 storage). The scheme of the solution i came was a small machine in the middle that would accept sftp credentials and would "stream" (or upload chunk by chunk) file to S3 storage from where pipeline would catch it and do the job.

CodePudding user response:

There are three issues at play here:

You're counting the time to download and upload a chunk as one transfer, so that if it takes 100 seconds to download, and 10 seconds to upload 1 mb, you report 10kb/s (1mb / 110 seconds) instead of 11kb/s followed by 103kb/s. I mention this because it's hiding the second issue.

Paramiko has some issues with sustained transfers for some people. This bug on paramiko has more details.

And, on top of these issues, you're transferring down one chunk, then turning around and uploading. In most environments, this is wasteful, even if things were operating well, this generally means that your download is spending 1/2 of it's time waiting for the upload.

You can fix all the issues:

import paramiko
import boto3
import multiprocessing
import time

# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
    def __init__(self):
        self._start = time.time()
        self._end = None
    def start(self):
        self._start = time.time()
    def end(self, bytes, desc):
        self._end = time.time()
        secs = self._end - self._start
        if secs > 0:
            print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")

class FastTransport(paramiko.Transport):
    # Correct issues with window size, see paramiko issue 175
    def __init__(self, sock):
        super(FastTransport, self).__init__(sock)
        self.window_size = 2147483647
        self.packetizer.REKEY_BYTES = pow(2, 40)
        self.packetizer.REKEY_PACKETS = pow(2, 40)

def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    transport = FastTransport((sftp_host, sftp_port))
    # Not necessary, but here for testing purposes, support either
    # password or private key auth
    if sftp_password is not None:
        transport.connect(username=sftp_username, password=sftp_password)
    else:
        pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
        transport.connect(username=sftp_username, pkey=pkey)
    ftp_connection = paramiko.SFTPClient.from_transport(transport)
    return ftp_connection

def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
    sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
    sftp_file = sftp_connection.file(sftp_path, "rb")
    # Enable pipelined mode, see paramiko issue 175
    sftp_file.set_pipelined()
    # Allow the transfer to fill up data in a background thread
    sftp_file.prefetch()
    chunk_size = 8388608
    tracker = SpeedTracker()
    num = 0
    while True:
        # Download one chunk
        tracker.start()
        chunk = sftp_file.read(chunk_size)
        if len(chunk) == 0:
            # All done, time to stop work
            queue.put(None)
            sftp_file.close()
            break
        # Send the chunk off to the reader process
        num  = 1
        tracker.end(len(chunk), f"Downloaded chunk #{num}")
        queue.put(chunk)

def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
                                  s3_file_path, part_number, chunk):
    # Upload one chunk to S3
    tracker = SpeedTracker()
    part = s3_connection.upload_part(
        Bucket=bucket_name,
        Key=s3_file_path,
        PartNumber=part_number,
        UploadId=multipart_upload["UploadId"],
        Body=chunk)
    tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
    part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
    return part_output

def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path, 
                                  aws_s3_path, sftp_username, sftp_password, sftp_key):
    # Start a worker process to get the data from SFTP
    queue = multiprocessing.Queue(10)
    proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
    proc.start()

    # And start reading from that worker to upload its results
    s3_connection = boto3.client('s3')
    multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
    parts = []
    while True:
        chunk = queue.get()
        if chunk is None:
            break
        part = send_chunk_to_s3(
            s3_connection,
            multipart_upload,
            bucket_name,
            aws_s3_path,
            len(parts)   1,
            chunk,
        )
        parts.append(part)

    # All done, clean up and finalize the multipart upload
    proc.join()
    part_info = {"Parts": parts}
    resp = s3_connection.complete_multipart_upload(
        Bucket=bucket_name,
        Key=aws_s3_path,
        UploadId=multipart_upload["UploadId"],
        MultipartUpload=part_info,
    )
    print(resp)

if __name__ == '__main__':
    sftp_host='TODO'
    sftp_port=int('22')
    sftp_path='TODO'
    sftp_username='TODO'
    sftp_password='TODO'
    sftp_key=None
    bucket_name='TODO'
    aws_s3_path='TODO'

    transfer_file_from_sftp_to_s3(
        bucket_name=bucket_name,
        sftp_host=sftp_host,
        sftp_port=22,
        sftp_path=sftp_path,
        aws_s3_path=aws_s3_path,
        sftp_username=sftp_username,
        sftp_password=sftp_password,
        sftp_key=sftp_key
    )

Tested on my local machine, I saw download and upload speeds of around 40 MB/s, which is certainly better without the bug fixes.

  • Related