I´m quite new with Airflow and Python. What I´m trying to do is get the result from a Bash command and compose the connection string with that return:
import pyodbc as odbc
import pandas as pd
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.decorators import dag, task
@dag(schedule_interval='0 15 10 * *', start_date=dt.datetime(2021, 10, 1), catchup=False)
def my_dag2():
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print $2}'",
do_xcom_push=True
)
@task()
def run():
def busca_informes_cvm(ano,mes):
url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{:4d}{:02d}.csv'.format(ano,mes)
return pd.read_csv(url, sep=';')
today = dt.date.today()
ano = (today.year)
mes= (today.month)-1
file_name_Comp = '{:4d}-{:02d}'.format(ano,mes)
file_name = '{:4d}{:02d}.csv'.format(ano,mes)
path_name = r'C:\Airflow\{:4d}{:02d}.csv'.format(ano,mes)
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= ' wsl_ip ';Database=CVM;uid=Airflow;pwd=ubuntu')
df = pd.read_sql_query('select max(DT_COMPTC) from Historico;', conn)
left = df[''].str[:7]
if file_name_Comp <= left[0]:
print('Sair')
else:
informes_diarios = busca_informes_cvm(ano,mes)
informes_diarios.to_csv(file_name, sep=';', index=False)
db_view_nm = '[dbo].[Bulk]'
qry = "BULK INSERT " db_view_nm " FROM '" path_name "' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '0x0a', FIRSTROW = 2,ROWS_PER_BATCH = 100000 )"
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
print('Concluído')
execute = run()
etl_dag = my_dag2()
I need to find a way to convert wsl_ip ia a string. Any help would be appreciated.
CodePudding user response:
To get the output from the "wsl_ip" task, you can use the .output
property that's exposed for every operator in Airflow. This property is an abstraction over the classic xcom_pull()
method known as an XComArg
(see docs here).
You could try something like this:
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print $2}'",
do_xcom_push=True
)
@task()
def run(ip):
...
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= ' ip ';Database=CVM;uid=Airflow;pwd=ubuntu')
...
execute = run(ip=wsl_ip.output)
The run
TaskFlow function now takes an input as the XCom
pushed from your BashOperator
task which should be converted to a string. Using the .output
in this way also automatically creates a task dependency between the "wsl_ip" and "run" tasks too.