Home > Mobile >  Mocking Postgres Response in Airflow
Mocking Postgres Response in Airflow

Time:06-02

I'm trying to mock a postgres response in airflow, but I wasn't able to do. Tried all solutions found on net and no help. And on airflow manual it offers only solution to use a docker image db.

Here is the code:

from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook

def process(pg_conn_id: str, ti: TaskInstance, **kwargs) -> str:

    xcom_in = ti.xcom_pull(task_ids="ids")

    pg_conn = PostgresHook(pg_conn_id).get_conn()
    cursor = pg_conn.cursor()
    cursor.execute("SELECT * FROM stub")
    result = cursor.fetchone()[0]
    print(result)

    return result

And this is one of the tried tests:

from unittest import mock

import pytest
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook

def test_process(mocker):
    input = "The input data"
    output="The value in DB"
    ti = TaskInstance
    ti.xcom_pull = MagicMock(return_value=input)


    mocker.patch.object(
        PostgresHook,
        "get_conn",
        return_value = MagicMock(name="pg")
    )

    assert output == process("mock_db", ti))

The problem is that the result, insteadd of the "input" value I receive

<MagicMock name='pg.cursor().fetchone().__getitem__()' id='2259571508176'>

CodePudding user response:

That's how mocks work - they are just recording the actual calls made to them. Then there is the concatenation/chaining of calls, if you are calling a method on a returned mock it is concatenated to the original mock (more on that in the next paragraph).

You successfully made pg_conn in pg_conn = PostgresHook(pg_conn_id).get_conn() to be a Magic mock. Then you had this line cursor = pg_conn.cursor() so cursor is equal to pg.cursor(). Then you have result equal to cursor.fetchone()[0] which we concatenate to the previous mock to get pg.cursor().fetchone().__getitem__() - since [0] is calling __getitem__ behind the scenes.

So in your case, you can have this code:

pg_mock = MagicMock(name="pg")
pg_mock.cursor().fetchone().__getitem__.return_value = "input"

mocker.patch.object(
        PostgresHook,
        "get_conn",
        return_value = pg_mock
    )

A final note about mock syntax: you can often replace .return_value with (), so pg_mock.cursor().fetchone().__getitem__.return_value = "input" is the same as pg_mock.cursor.return_value.fetchone.return_value.__getitem__.return_value = "input".

  • Related