I have a Flask application deployed on Heroku. I use Celery Redis to process payments via Stripe. When I process Stripe events, I hit max memory issues immediately (R14). I have 1 worker dyno
- I checkout using Stripe
- My endpoint receives the events as expected
- The endpoint sends events to be processed asynchronously
celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key])
- I can see the events coming in, but memory maxes out:
2021-11-12T00:49:07.801006 00:00 app[worker.1]: redis.exceptions.ConnectionError: max number of clients reached
2021-11-12T00:49:18.170270 00:00 heroku[worker.1]: Process running mem=549M(107.4%)
2021-11-12T00:49:18.224263 00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
I'm wondering if it's the case that my Celery task is too memory-intensive. If that's true, I'm not sure how to resolve it. Although the code can be optimized, I'm only testing it with 1 checkout- surely a Celery Redis setup could handle a small number (~10) of simultaneous checkouts? I could also limit the events being sent to the endpoint, but I'm not sure that's the best way to scale this up either.
Code for the endpoint:
@payments_bp.route('/wh', methods=['POST'])
def wh():
# This endpoint receives all webhooks
stripe_api_key = current_app.config['STRIPE_SECRET_KEY']
# Payloads should be small (>1MB)
if request.content_length > (1024**2):
abort(400)
payload = request.get_data()
sig_header = request.environ.get('HTTP_STRIPE_SIGNATURE')
endpoint_secret = current_app.config['STRIPE_ENDPOINT_SECRET']
event = None
try:
event = stripe.Webhook.construct_event(
payload, sig_header, endpoint_secret
)
celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key])
except ValueError as e:
return {}, 400
except stripe.error.SignatureVerificationError as e:
return {}, 400
return 'Success', 200
Code for the celery task:
@celery_app.task
def process_webhook_event(event, stripe_api_key):
stripe.api_key = stripe_api_key
if 'customer' in event['type']:
customer_id = event['data']['object']['id']
else:
customer_id = event['data']['object']['customer']
all_events_dict = json.loads(str(stripe.Event.list(related_object=customer_id)))
# types=['customer.subscription.created','subscription.updated', 'charge.succeeded']
events_to_process = []
for event in all_events_dict['data']:
event_id = event['id']
# This ensures we don't process events multiple times
if event_id not in events_to_process and not StripeEvent.query.filter_by(stripe_event_id=event_id).first():
event_type = event['type']
event_created = event['created']
stripe_event = StripeEvent(stripe_event_id=event_id,
event_type=event_type,
event_created=event_created,
event_json=str(event))
try:
db.session.add(stripe_event)
db.session.commit()
events_to_process.append(event_id)
except:
pass
customer_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.created' and item['id'] in events_to_process), None)
subscription_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.created'
and item['id'] in events_to_process and not Subscription.query.filter_by(id=item['data']['object']['id']).first()), None)
subscription_updated = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.updated' and item['id'] in events_to_process), None)
invoice_created = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.created'
and item['id'] in events_to_process and not Invoice.query.filter_by(id=item['data']['object']['id']).first()), None)
invoice_updated = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.updated' and item['id'] in events_to_process), None)
charge_succeeded = next((item for item in all_events_dict['data'] if item['type'] == 'charge.succeeded'), None)
if customer_created:
customer_id = customer_created['data']['object']['id']
customer_email = customer_created['data']['object']['email']
user = User.query.filter_by(email=customer_email).first()
# TODO: add catch here
if not user:
pass
if not user.stripe_customer_id:
user.add_stripe_customer_id(customer_id)
db.session.commit()
if subscription_created:
subscription_id = subscription_created['data']['object']['id']
current_subscription = Subscription.query.filter_by(id=subscription_id).first()
if not current_subscription:
subscription = Subscription()
subscription.add_subscription(subscription_created)
db.session.add(subscription)
db.session.commit()
else:
current_subscription.update_subscription(subscription_created)
db.session.commit()
if subscription_updated:
subscription_id = subscription_updated['data']['object']['id']
event_created = subscription_updated['created']
current_subscription = Subscription.query.filter_by(id=subscription_id).first()
if current_subscription and event_created > current_subscription.last_updated:
current_subscription.update_subscription(subscription_updated)
db.session.commit()
if invoice_created:
invoice_id = invoice_created['data']['object']['id']
current_invoice = Invoice.query.filter_by(id=invoice_id).first()
if not current_invoice:
invoice = Invoice()
invoice.add_invoice(invoice_created)
db.session.add(invoice)
db.session.commit()
if invoice_updated:
invoice_id = invoice_updated['data']['object']['id']
current_invoice = Invoice.query.filter_by(id=invoice_id).first()
if current_invoice:
current_invoice.update_invoice(invoice_updated)
else:
invoice = Invoice()
invoice.add_invoice(invoice_updated)
db.session.commit()
if charge_succeeded:
payment_id = charge_succeeded['data']['object']['id']
invoice_id = charge_succeeded['data']['object']['invoice']
if not Payment.query.filter_by(id=payment_id).first():
payment = Payment()
payment.add_payment(charge_succeeded)
invoice = Invoice.query.filter_by(id=invoice_id).first()
try:
invoice.pay_invoice(charge_succeeded)
except AttributeError:
print('Invoice not found')
db.session.add(payment)
db.session.commit()
user = User.query.filter_by(stripe_customer_id=customer_id).first()
subscription = Subscription.query.filter_by(stripe_customer_id=customer_id).first()
if user and subscription:
if subscription.status in ['Active', 'Trial']:
user.membership_type = 'Premium'
else:
user.membership_type = 'Free'
db.session.commit()
return None
Any help is much appreciated, thank you
CodePudding user response:
Solved this issue. The design was all wrong:
There's no need to process a bunch of events. You actually just need a few basic ones (customer.subscription.created, customer.subscription.updated)
Other values should be accessed via API requests. Subscription objects don't come with customer email, which you need to associate your sites users with subscriptions. This can easily be accessed with:
customer_id = event['data']['object']['customer']
customer = stripe.Customer.retrieve(
customer_id,
api_key=stripe_api_key
)
#User is my DB model
user = User.query.filter_by(email=customer.email).first()
It will probably be easier to create and update other database info through API requests than trying to pass around events.
The new celery task looks like this:
@celery_app.task()
def activate_subscription(customer_id, stripe_api_key):
customer = stripe.Customer.retrieve(
customer_id,
api_key=stripe_api_key
)
user = User.query.filter_by(email=customer.email).first()
user.membership_type = 'Premium'
db.session.commit()
return None
The endpoint passes just the customer id instead of the full event:
try:
event = stripe.Webhook.construct_event(
payload, sig_header, endpoint_secret
)
event_type = event['type']
celery_tasks.activate_subscription.apply_async(args=[customer_id, stripe_api_key])