I have airflow dag written to work with Python operator.I need to use PostgreSQL operator for same dag without changing functionality of dag. Here is code with Python operators. How Should I replace Python operator with PostgreSQL operator? Or can we use two different operators in a single dag?
from airflow import DAG
from airflow.models import dag
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
import os
script_dir_path = os.path.dirname(os.path.realpath(__file__))
import time
from time import sleep
from xlwt import Workbook
import pandas as pd
from csv import writer
from csv import DictReader
from datetime import datetime
from selenium import webdriver
import psycopg2
opt = webdriver.FirefoxOptions()
wb=Workbook()
sheet1=wb.add_sheet('Sheet 1',cell_overwrite_ok=False)
i=0
j=0
default_args = {
'owner': 'airflow',
'retries': 1
}
dag = DAG( 'Yahoo_Finance',
default_args=default_args,
description='fetching ticker symbol',
catchup=False,
start_date= datetime.now(),
schedule_interval= '* 7 * * *'
)
def extract_tickers():
conn = psycopg2.connect(dbname='postgres', user='airflow', password='airflow', host='postgres')
cur = conn.cursor()
with open(r'./fromlocal/EQUITY_L.csv') as read_obj:
csv_dict_reader = DictReader(read_obj)
url = "https://finance.yahoo.com"
driver = webdriver.Remote("http://selenium:4444/wd/hub", options=opt)
driver.get(url)
for row in csv_dict_reader:
time.sleep(4)
# action = ActionChains(driver)
time.sleep(4)
searchBox = driver.find_element_by_id('yfin-usr-qry')
time.sleep(4)
searchBox.send_keys(row['SYMBOL'])
time.sleep(4)
# clicking on search
driver.find_element_by_xpath('//*[@id="header-desktop-search-button"]').click()
time.sleep(15)
companyname = driver.find_elements_by_xpath('//*[@id="quote-header-info"]/div[2]/div[1]/div[1]/h1')
ticker = companyname = str(companyname[0].text)
print("comapny name: " companyname)
ticker = ticker[::-1]
ticker = ticker[1:ticker.find("(")]
ticker = ticker[::-1]
print("extracted ticker: " ticker)
companyname = companyname[:companyname.find(" (")]
companyname = companyname.replace("'","''")
cur.execute("INSERT INTO tickers1 (keyword,companyName) values ('" ticker "','" companyname "')")
conn.commit()
cur.close()
conn.close()
print(script_dir_path)
Yahoo_Finance = PythonOperator(task_id = 'extract_tickers',
python_callable = extract_tickers,
provide_context = True,
dag= dag )
Yahoo_Finance
CodePudding user response:
PostgesOperator
runs SQL. Your code is querying API, generate a CSV and loading it to the DB. You can not do that with PostgesOperator
.
What you can do is to replace the usage of psycopg2
with PostgresHook
.
The hook is a wrapper around psycopg2
that expose you functions that you can interact with. This means that, for example, you don't need to handle how to connect to Postgres on your own. Simply define the connection in Admin -> Connections
and reference the connection name in the hook:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_tickers():
with PostgresHook(postgres_conn_id="postgres_default").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("Your SQL CODE")
To see other methods available in the hook check the hook source code.