Home > Software engineering >  Restarting DynamoDB batch write job where it left off
Restarting DynamoDB batch write job where it left off

Time:03-14

I'm loading a large pandas DataFrame into a DynamoDB table with the boto3 batch_writer context. The hash key is symbol and the sort key is date.

with table.batch_writer() as batch:
    for row in df.itertuples(index=False, name="R"):
        batch.put_item(Item=row)

My network connection was lost and the job stopped. I want to start putting records where I left off.

The DynamoDB table has 1_400_899 items. My DataFrame has 5_998_099 rows.

When I inspect the DataFrame at and around index 1_400_899, those records do not exist in DynamoDB. That makes me think rows are not inserted sequentially.

How can I determine where I left off so I can slice the DataFrame appropriately and restart the job?

CodePudding user response:

Dynamodb's put_item doesn't gaurantee that the items will be inserted in a sequential fashion so you can not rely on the order of items inserted. Now, coming back to your question How can I determine where I left off so I can slice the DataFrame appropriately and restart the job?

The only way to know for sure is to scan the entire table and retrieve the values for primary key columns that are already inserted then drop those keys from the original dataframe and start the batch write operation again.

Here is some code that will help you get the job done:

def scan_table(table, keys, **kwargs):
    resp = table.scan(ProjectionExpression=', '.join(keys), **kwargs)
    yield from resp['Items']
    if 'LastEvaluatedKey' in resp:
        yield from scan_table(table, keys, ExclusiveStartKey=resp['LastEvaluatedKey'])


keys = ['symbol', 'date']
df_saved = pd.DataFrame(scan_table(table, keys))

i1 = df.set_index(keys).index
i2 = df_saved.set_index(keys).index

df_not_saved = df[~i1.isin(i2)]

Now you can restart the batch write operation on df_not_saved instead of df

with table.batch_writer() as batch:
    for row in df_not_saved.itertuples(index=False, name="R"):
        batch.put_item(Item=row)

CodePudding user response:

The Python batch_writer() is a utility around DynamoDB's BatchWriteItem operation. It splits your work into smallish sets of items (BatchWriteItem is limited to 25 items), and writes each batch using BatchWriteItem.

Normally, these writes are sequential in a sense: If your client managed to send a batch of writes to DynamoDB, they will all be done, even if you lose your connection. However, there is a snag here: BatchWriteItem is not guaranteed to succeed writing all the items. When it can't, often because you have used more than your reserved capacity, it returns UnprocessedItems - a list of items that need to be resent. batch_writer() will resend those items later - but if you interrupt it at that point - it is possible that a random subset from the last 25 items were written, but not all of them. So make sure to back at least 25 items to be sure you have reached the position where batch_writer() wrote everything successfully.

Another question is where did you get the information that the DynamoDB table has 1_400_899 items. DynamoDB does have such a number, but they claim it is only updated once every 6 hours. Did you wait 6 hours?

  • Related