Home > Net >  Django: how to test for concurrency issues when using cron jobs
Django: how to test for concurrency issues when using cron jobs

Time:11-10

I have a Django application hosted on a remote server which runs some cron jobs at relatively short intervals. One of these cron jobs executes a command that fetches a queryset from the database, calls an external API and changes the models based on the response from the API. If I don't take care, the cron job will execute multiple times before the API responds, thus leading to concurrency issues and multiple instances of the same models being updated at the same time.

I have different strategies to avoid this issue, but I would like to write tests that I can run locally, mocking the API call and ensuring, that two cron tasks don't both try to work on an object at the same time. How do I do that?

My code looks something like this (illustrative purpose to show the problem):

def task():
    qs = MyModel.objects.filter(task_ran=False)
    for model in qs:
        resp = api_call(model.foo)
        model.bar = resp
        model.task_ran = True
        model.save()

So, how can I write a test, that checks that if task() is called a second time before the first call has finished, then it won't update the model again and the API won't get called again? Below is a sketch of a test, and I have tried to put the calls to task() in separate threads, but that causes the test to freeze and - after a KeyboardInterrupt - fail with

django.db.utils.OperationalError: database "test_db" is being accessed by other users

DETAIL: There is 1 other session using the database.```

@patch("api_call")
def test_task(self, mock_api_call):
    def side_effect(number):
        time.sleep(2)
        return number   1

    mock_api_call.side_effect = side_effect

    # how to call these simultaneously? threading causes Django to get mad
    task()
    task()

    mock_api_call.assert_called_once()

CodePudding user response:

Okay, so I found an answer based on this answer. Basically, testing can be done in Django via threading, but it requires a couple of things:

  • First of all, the test class itself must be a subclass of TransactionTestCase (at least if it involves any database shenanigans or the use of .select_for_update which my code does).
  • Secondly, the database connections opened in each thread should be closed again as the thread terminates. This can be done by using a ThreadPoolExecutor to create Futures and then adding a callback function for when the thread is done via .add_done_callback.

So with this, the test can be written as follows:

import concurrent.futures
from django.db import connections
from django.test import TransactionTestCase

class CronTestCase(TransactionTestCase):
    def on_done(self, future):
        connections.close_all()

    @patch("api_call")
    def test_task(self, mock_api_call):
        # setup the test
        num_threads = 5
        with concurrent.futures.ThreadPoolExecutor() as executor:
            for _ in range(num_threads):
                future = executor.submit(task)
                future.add_done_callback(self.on_done)

        mock_api_call.assert_called_once()
  • Related