Home > database >  Airflow: Return BashOperartor as string for odbc connection
Airflow: Return BashOperartor as string for odbc connection

Time:11-09

I´m quite new with Airflow and Python. What I´m trying to do is get the result from a Bash command and compose the connection string with that return:

    import pyodbc as odbc
    import pandas as pd
    import datetime as dt
    from airflow.operators.bash import BashOperator
    from airflow.decorators import dag, task

    @dag(schedule_interval='0 15 10 * *', start_date=dt.datetime(2021, 10, 1), catchup=False)
    def my_dag2():
      wsl_ip = BashOperator(
        task_id="wsl_ip",
        bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print $2}'",
        do_xcom_push=True
        )

   @task()
   def run():
    
    def busca_informes_cvm(ano,mes):
        url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{:4d}{:02d}.csv'.format(ano,mes)
        return pd.read_csv(url, sep=';')

    today = dt.date.today()
    ano = (today.year)
    mes= (today.month)-1
    file_name_Comp = '{:4d}-{:02d}'.format(ano,mes)
    file_name = '{:4d}{:02d}.csv'.format(ano,mes)
    path_name = r'C:\Airflow\{:4d}{:02d}.csv'.format(ano,mes)

    conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '  wsl_ip  ';Database=CVM;uid=Airflow;pwd=ubuntu')
    df = pd.read_sql_query('select max(DT_COMPTC) from Historico;', conn)
    left = df[''].str[:7]

    if file_name_Comp <= left[0]:
        print('Sair')
    else:
        informes_diarios = busca_informes_cvm(ano,mes)
        informes_diarios.to_csv(file_name, sep=';', index=False)
        db_view_nm = '[dbo].[Bulk]'
        qry = "BULK INSERT "   db_view_nm   " FROM '"   path_name   "' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '0x0a', FIRSTROW = 2,ROWS_PER_BATCH = 100000 )"
        cursor = conn.cursor()
        success = cursor.execute(qry)
        conn.commit()
        cursor.close
        print('Concluído')

   execute = run()

etl_dag = my_dag2()

I need to find a way to convert wsl_ip ia a string. Any help would be appreciated.

CodePudding user response:

To get the output from the "wsl_ip" task, you can use the .output property that's exposed for every operator in Airflow. This property is an abstraction over the classic xcom_pull() method known as an XComArg (see docs here).

You could try something like this:

wsl_ip = BashOperator(
        task_id="wsl_ip",
        bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print $2}'",
        do_xcom_push=True
    )

@task()
    def run(ip):

        ...

        conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '  ip  ';Database=CVM;uid=Airflow;pwd=ubuntu')

        ...

    execute = run(ip=wsl_ip.output)

The run TaskFlow function now takes an input as the XCom pushed from your BashOperator task which should be converted to a string. Using the .output in this way also automatically creates a task dependency between the "wsl_ip" and "run" tasks too.

  • Related