Home > Software engineering >  Dask: TimeOut Error When Reading Parquet from S3
Dask: TimeOut Error When Reading Parquet from S3

Time:02-21

I'm experiencing some frustrating issues using Dask-Yarn on an EMR cluster. I'm trying to read in about 5M rows of data from partitioned parquet files stored in S3. I repartition the data across the 800 Dask workers and then persist the data to memory. There is no problem at this point. Then when I use downstream functions to manipulate this data, I start to run into TimeOut errors a quarter of the way through the process, which doesn't make sense because I thought I already persisted this data to memory. Does anyone how I can resolve these timeout issues. Any help would be greatly appreciated. Also why does it read the parquet files again; I already persisted them to memory?

Error:

ConnectTimeoutError: Connect timeout on endpoint URL: "https://data-files.s3.amazonaws.com/users/rhun/data.parquet/batch=0.0/part-00114-811278f0-e6cc-43d4-a38c-2043509029ac.c000.snappy.parquet"

Code Example:

cluster = YarnCluster(environment='doctype.tar.gz', 
                      worker_memory='12GiB', 
                      worker_vcores=8
                     )
client = Client(cluster)
cluster.scale(800)
df = dd.read_parquet('s3://data-files/users/rhun/data_2022-02-18.parquet/batch=0.0/',
                         columns=['filehash',
                                  'sentences',
                                  'div2style'
                                 ],
                         engine='pyarrow')
df = df.repartition(npartitions=5000).persist()

def calc_pdf_features(df):
    
    files_to_download = df['filehash'].tolist()
    
    AWS_BUCKET = "my_data"

    session = boto3.Session()
    client = session.client("s3")
    func = partial(download_one_file, AWS_BUCKET, client)

    res = []
    successful_downloads = []

    # download pdf files concurrently
    with ThreadPoolExecutor(max_workers=32) as executor:
        futures = {
            executor.submit(func, file_to_download): file_to_download for file_to_download in files_to_download
        }
        for future in as_completed(futures):
            if future.exception():
                res.append({'filehash': futures[future],
                            'bullet_count': float(0),
                            'item_count': float(0),
                            'colon_count': float(0),
                            'element_tags': [],
                            'max_element_leng': float(0)})
            else:
                successful_downloads.append(futures[future])
        
    def traverse_pdf(fh):
        doc = fitz.open(fh   '.pdf')
        font_counts, styles = fonts(doc, granularity=False)
        size_tag = font_tags(font_counts, styles)
        elements = headers_para(doc, size_tag)
        res.append({'filehash': fh,
                    'bullet_count': float(bullet_counter_row(elements)),
                    'item_count': float(item_counter_row(elements)),
                    'colon_count': float(colon_counter_row(elements)),
                    'element_tags': header_tags(elements),
                    'max_element_leng': max_first3Elements(elements)
                   })

    # extract features from PDF files concurrently 
    with ThreadPoolExecutor(max_workers=32) as executor:
        futures = {
            executor.submit(traverse_pdf, fh): fh for fh in successful_downloads
        }
        for future in as_completed(futures):
            if future.exception():
                res.append({'filehash': futures[future],
                            'bullet_count': float(0),
                            'item_count': float(0),
                            'colon_count': float(0),
                            'element_tags': [],
                            'max_element_leng': float(0)})
                
    return pd.merge(df, pd.DataFrame(res), on=['filehash'], how='inner')

df = adf.map_partitions(calc_pdf_features, 
                                     meta={'filehash': str,
                                           'sentences': object,
                                           'div2style': object,
                                           'bullet_count': float,
                                           'item_count': float,
                                           'colon_count': float,
                                           'element_tags': object,
                                           'max_element_leng': object
                                          }
                                    )
df.repartition(npartitions=200).to_parquet(
    's3://my-data/DocType_v2/features/batch=0.0/',
    engine='pyarrow')

CodePudding user response:

If I understand the code correctly, at the maximum load there are 800 workers, each potentially launching 32 download processes. It's speculation, but this number of requests might exceed the allowed concurrent requests in s3, so some of the workers end up waiting for a connection too long.

One way out is to allow a longer wait time before timeout, see this answer. However, that's still not ideal as you will have workers sitting idle. Instead, the code could be refactored to have a single connection, to avoid nested parallelization, and to let dask handle all the downloads and processing.

CodePudding user response:

I have several points what can be a problem, and how to solve it.

  • I think it is wrong to do your own Thread Pool inside calc_pdf_features function. If you already delegate parralel processing to the Dask - you should not be doing so. I would try to make your each partition processing single threaded, and than let Dask do the scheduling.
  • In order to debug I would put something "very simple" instead of calc_pdf_features and see that everything is working - so you will distinguish problems caused by Dask / AWS etc and timeouts because processing of the partition takes too much time.
  • Related