I have an Airflow DAG that runs a spark file (reads two parquet files, performs transformations on them, and loads the data into a single JSON file). Now the data from this JSON file needs to be pushed into a Postgres table. At first, I was having trouble reading the JSON, but then I found a way to read the JSON as a whole list of multiple dictionaries. But I don't know how to load this data into the Postgres table. Here is my DAG snippet:
import os, json
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python_operator import PythonOperator
def read_json_file(filename): # function that I found online to read JSON
with open(filename, "r") as r:
response = r.read()
response = response.replace('\n', '')
response = response.replace('}{', '},{')
response = "[" response "]"
return json.loads(response)
def load_data(ds, **kwargs):
path_to_json = '/path/to/json/staging/day=20220815/'
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
filename = path_to_json str(json_files[0])
doc = read_json_file(filename)
date_id = [doc[i]['day'] for i in range(len(doc))]
interact_id = [doc[i]['interact_id'] for i in range(len(doc))]
case_id = [doc[i]['case_id'] for i in range(len(doc))] # 1
topic_id = [doc[i]['topic_id'] for i in range(len(doc))]
create_date = [doc[i]['create_date'] for i in range(len(doc))]
end_date = [doc[i]['end_date'] for i in range(len(doc))]
topic_start_time = [doc[i]['topic_start_time'] for i in range(len(doc))]
title = [doc[i]['title'] for i in range(len(doc))]
direction = [doc[i]['direction'] for i in range(len(doc))]
notes = [doc[i]['notes'] for i in range(len(doc))]
_type_ = [doc[i]['_type_'] for i in range(len(doc))]
reason = [doc[i]['reason'] for i in range(len(doc))]
result = [doc[i]['result'] for i in range(len(doc))] # 2
msisdn = [doc[i]['msisdn'] for i in range(len(doc))]
price_plan = [doc[i]['x_price_plan'] for i in range(len(doc))]
cust_type = [doc[i]['cust_type'] for i in range(len(doc))] # 3
credit_limit = [doc[i]['credit_limit'] for i in range(len(doc))] # 4
unit = [doc[i]['unit'] for i in range(len(doc))]
supervisor = [doc[i]['supervisor'] for i in range(len(doc))]
sdc = [doc[i]['sdc'] for i in range(len(doc))] # 5
dealer_id = [doc[i]['dealer_id'] for i in range(len(doc))] # 6
year = [doc[i]['year'] for i in range(len(doc))]
month = [doc[i]['month'] for i in range(len(doc))]
subs_no = [doc[i]['subs_no'] for i in range(len(doc))] # 7
cust_bill_cycle = [doc[i]['cust_bill_cycle'] for i in range(len(doc))] # 8
row = (date_id,interact_id,case_id,topic_id,create_date,end_date,topic_start_time,title,\
direction,notes,_type_,reason,result,msisdn,price_plan,cust_type,credit_limit,\
unit,supervisor,sdc,dealer_id,year,month,subs_no,cust_bill_cycle)
insert_cmd = """
INSERT INTO table_name
(date_id,interact_id,case_id,topic_id,create_date,end_date,topic_start_time,title,
direction,notes,_type_,reason,result,msisdn,price_plan,cust_type,credit_limit,
unit,supervisor,sdc,dealer_id,year,month,subs_no,cust_bill_cycle)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
"""
pg_hook = PostgresHook(postgres_conn_id='postgres_default', sql=insert_cmd)
for d in entry_data:
pg_hook.run(insert_cmd, parameters=row)
default_args = {
'retries': 3,
}
with DAG (
dag_id='final_DAG',
schedule_interval='0 0 * * *',
start_date= datetime(2022, 11, 30),
catchup=False,
default_args=default_args
) as dag:
execute_spark = BashOperator(
task_id='execute_spark',
bash_command="""
cd
python3 path/to/spark_notebook.py
"""
)
load_data_task = PythonOperator(
task_id='load_data_task',
provide_context=True,
python_callable=load_data,
dag=dag)
execute_spark >> load_data_task
When the load_data_task is triggered, I get this error listed in my logs:
psycopg2.errors.DatatypeMismatch: column "date_id" is of type date but expression is of type text[]
I understand what the error is saying, but don't know how to deal with it. How can I get this thing done?
CodePudding user response:
The problem statement provided has multiple issues. The statement would benefit from the addition of,
- an example of what the json file or
doc
variable looks like - the table definition for the
table_name
table - code is missing the definition of
entry_data
The following solutions applies assumptions due to the missing information mentioned and uses a limited example.
The error message appears to be saying that the date_id
column in the table_name
PostGRES table is of type DATE
. Whereas the python variable named date_id
is a list of strings (or in PostGRES terms data type text[]
).
It looks like all of the python variables input into the row
variable are a lists. This is not a correct format to use for the SQL insert statement.
Part 0. Assumptions
Assumption 1 - doc
looks like this
[{ "day":"2022-11-30", "interact_id":"8675309", "case_id":"12345", "topic_id":"09876", "create_date":"2022-01-01", "end_date":"2022-12-05" }, { "day":"2022-11-29", "interact_id":"8675307", "case_id":"12344", "topic_id":"08888", "create_date":"2022-02-02", "end_date":"2023-01-05" }]
Assumption 2 - table_name
column data types are the following
table_name | column_name | data_type |
---|---|---|
table_name | date_id | DATE |
table_name | interact_id | TEXT |
table_name | case_id | TEXT |
table_name | topic_id | TEXT |
table_name | create_date | TEXT |
table_name | end_date | TEXT |
Look this up for your table using the following command,
SELECT
table_name,
column_name,
data_type
FROM
information_schema.columns
WHERE
table_name = 'table_name';
Part 1. Get rid of the python lists for each variable.
This solution loops through the json and inserts into the sql table for each item.
# esablish postgres connection
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
insert_cmd = """
INSERT INTO table_name (date_id,interact_id,case_id,topic_id,create_date,end_date)
VALUES(%s,%s,%s,%s,%s,%s);
"""
# load file
doc = read_json_file(filename)
# loop through items in doc
for i in range(len(doc)):
date_id = i['day']
interact_id = i['interact_id']
case_id = i['case_id']
topic_id = i['topic_id']
create_date = i['create_date']
end_date = i['end_date']
row = (date_id,interact_id,case_id,topic_id,create_date,end_date)
# insert item to table
pg_hook.run(insert_cmd, parameters=row)
Part 2. Ensure each variable matches the data type that PostGRES expects
The PostGRES DATE
type format accepts many different input types: https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-DATE-TABLE
yyyy-mm-dd
is the recommended DATE
format. So we will continue this solution with the assumption that is the format used by the table_name
table
To fix the error, the python date_id
variable will need to be reformatted to a python datetime data type using the python datetime library.
The python datetime format definition '%Y/%m/%d'
defines the yyyy-mm-dd
datetime format
instead of this
date_id = i['day']
use this to convert the string to a datetime type
date_id = datetime.strptime(i['day'], '%Y/%m/%d')
more about datetime.strptime
function here: https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior