Home > Software design >  Heroku - Redis Exceeds Memory Quota with Stripe Events
Heroku - Redis Exceeds Memory Quota with Stripe Events

Time:11-21

I have a Flask application deployed on Heroku. I use Celery Redis to process payments via Stripe. When I process Stripe events, I hit max memory issues immediately (R14). I have 1 worker dyno

  1. I checkout using Stripe
  2. My endpoint receives the events as expected
  3. The endpoint sends events to be processed asynchronously celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key])
  4. I can see the events coming in, but memory maxes out:
2021-11-12T00:49:07.801006 00:00 app[worker.1]: redis.exceptions.ConnectionError: max number of clients reached
2021-11-12T00:49:18.170270 00:00 heroku[worker.1]: Process running mem=549M(107.4%)
2021-11-12T00:49:18.224263 00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)

I'm wondering if it's the case that my Celery task is too memory-intensive. If that's true, I'm not sure how to resolve it. Although the code can be optimized, I'm only testing it with 1 checkout- surely a Celery Redis setup could handle a small number (~10) of simultaneous checkouts? I could also limit the events being sent to the endpoint, but I'm not sure that's the best way to scale this up either.

Code for the endpoint:

@payments_bp.route('/wh', methods=['POST'])
def wh():
    # This endpoint receives all webhooks
    stripe_api_key = current_app.config['STRIPE_SECRET_KEY']
    # Payloads should be small (>1MB)
    if request.content_length > (1024**2):
        abort(400)

    payload = request.get_data()
    sig_header = request.environ.get('HTTP_STRIPE_SIGNATURE')

    endpoint_secret = current_app.config['STRIPE_ENDPOINT_SECRET']
    event = None

    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, endpoint_secret
        )

        celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key])
    except ValueError as e:
        return {}, 400

    except stripe.error.SignatureVerificationError as e:
        return {}, 400

    return 'Success', 200

Code for the celery task:

@celery_app.task
def process_webhook_event(event, stripe_api_key):
    stripe.api_key = stripe_api_key

    if 'customer' in event['type']:
        customer_id = event['data']['object']['id']
    else:
        customer_id = event['data']['object']['customer']

    all_events_dict = json.loads(str(stripe.Event.list(related_object=customer_id)))
    # types=['customer.subscription.created','subscription.updated', 'charge.succeeded']
    events_to_process = []

    for event in all_events_dict['data']:
        event_id = event['id']

        # This ensures we don't process events multiple times
        if event_id not in events_to_process and not StripeEvent.query.filter_by(stripe_event_id=event_id).first():
            event_type = event['type']
            event_created = event['created']

            stripe_event = StripeEvent(stripe_event_id=event_id,
                                        event_type=event_type,
                                        event_created=event_created,
                                        event_json=str(event))

            try:
                db.session.add(stripe_event)
                db.session.commit()

                events_to_process.append(event_id)
            except:
                pass

    customer_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.created' and item['id'] in events_to_process), None)
    subscription_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.created'
                                and item['id'] in events_to_process and not Subscription.query.filter_by(id=item['data']['object']['id']).first()), None)
    subscription_updated = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.updated' and item['id'] in events_to_process), None)
    invoice_created = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.created'
                                and item['id'] in events_to_process and not Invoice.query.filter_by(id=item['data']['object']['id']).first()), None)
    invoice_updated = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.updated' and item['id'] in events_to_process), None)

    charge_succeeded = next((item for item in all_events_dict['data'] if item['type'] == 'charge.succeeded'), None)

    if customer_created:
        customer_id = customer_created['data']['object']['id']
        customer_email = customer_created['data']['object']['email']

        user = User.query.filter_by(email=customer_email).first()

        # TODO: add catch here
        if not user:
            pass

        if not user.stripe_customer_id:
            user.add_stripe_customer_id(customer_id)

        db.session.commit()

    if subscription_created:
        subscription_id = subscription_created['data']['object']['id']
        current_subscription = Subscription.query.filter_by(id=subscription_id).first()

        if not current_subscription:
            subscription = Subscription()
            subscription.add_subscription(subscription_created)

            db.session.add(subscription)
            db.session.commit()

        else:
            current_subscription.update_subscription(subscription_created)

            db.session.commit()

    if subscription_updated:
        subscription_id = subscription_updated['data']['object']['id']
        event_created = subscription_updated['created']
        current_subscription = Subscription.query.filter_by(id=subscription_id).first()

        if current_subscription and event_created > current_subscription.last_updated:
            current_subscription.update_subscription(subscription_updated)

            db.session.commit()

    if invoice_created:
        invoice_id = invoice_created['data']['object']['id']

        current_invoice = Invoice.query.filter_by(id=invoice_id).first()

        if not current_invoice:
            invoice = Invoice()
            invoice.add_invoice(invoice_created)

            db.session.add(invoice)

            db.session.commit()

    if invoice_updated:
        invoice_id = invoice_updated['data']['object']['id']

        current_invoice = Invoice.query.filter_by(id=invoice_id).first()

        if current_invoice:
            current_invoice.update_invoice(invoice_updated)
        else:
            invoice = Invoice()
            invoice.add_invoice(invoice_updated)

        db.session.commit()

    if charge_succeeded:
        payment_id = charge_succeeded['data']['object']['id']
        invoice_id = charge_succeeded['data']['object']['invoice']

        if not Payment.query.filter_by(id=payment_id).first():
            payment = Payment()
            payment.add_payment(charge_succeeded)

            invoice = Invoice.query.filter_by(id=invoice_id).first()

            try:
                invoice.pay_invoice(charge_succeeded)
            except AttributeError:
                print('Invoice not found')

            db.session.add(payment)
            db.session.commit()

    user = User.query.filter_by(stripe_customer_id=customer_id).first()
    subscription = Subscription.query.filter_by(stripe_customer_id=customer_id).first()

    if user and subscription:
        if subscription.status in ['Active', 'Trial']:
            user.membership_type = 'Premium'
        else:
            user.membership_type = 'Free'
        db.session.commit()

    return None

Any help is much appreciated, thank you

CodePudding user response:

Solved this issue. The design was all wrong:

  1. There's no need to process a bunch of events. You actually just need a few basic ones (customer.subscription.created, customer.subscription.updated)

  2. Other values should be accessed via API requests. Subscription objects don't come with customer email, which you need to associate your sites users with subscriptions. This can easily be accessed with:

customer_id = event['data']['object']['customer']

customer = stripe.Customer.retrieve(
        customer_id,
        api_key=stripe_api_key
    )

#User is my DB model
user = User.query.filter_by(email=customer.email).first()

It will probably be easier to create and update other database info through API requests than trying to pass around events.

The new celery task looks like this:

@celery_app.task()
def activate_subscription(customer_id, stripe_api_key):

    customer = stripe.Customer.retrieve(
        customer_id,
        api_key=stripe_api_key
    )
    user = User.query.filter_by(email=customer.email).first()

    user.membership_type = 'Premium'

    db.session.commit()

    return None

The endpoint passes just the customer id instead of the full event:

    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, endpoint_secret
        )
        event_type = event['type']
        
        celery_tasks.activate_subscription.apply_async(args=[customer_id, stripe_api_key])
  • Related