Home > Net >  Load JSON data into postgres table using airflow
Load JSON data into postgres table using airflow

Time:12-01

I have an Airflow DAG that runs a spark file (reads two parquet files, performs transformations on them, and loads the data into a single JSON file). Now the data from this JSON file needs to be pushed into a Postgres table. At first, I was having trouble reading the JSON, but then I found a way to read the JSON as a whole list of multiple dictionaries. But I don't know how to load this data into the Postgres table. Here is my DAG snippet:

import os, json

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python_operator import PythonOperator


def read_json_file(filename): # function that I found online to read JSON
    with open(filename, "r") as r:
        response = r.read()
        response = response.replace('\n', '')
        response = response.replace('}{', '},{')
        response = "["   response   "]"
        return json.loads(response)

def load_data(ds, **kwargs):
    path_to_json = '/path/to/json/staging/day=20220815/'
    json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
    filename = path_to_json str(json_files[0]) 

    doc = read_json_file(filename)

    date_id = [doc[i]['day'] for i in range(len(doc))]
    interact_id = [doc[i]['interact_id'] for i in range(len(doc))]
    case_id = [doc[i]['case_id'] for i in range(len(doc))] # 1 
    topic_id = [doc[i]['topic_id'] for i in range(len(doc))]
    create_date = [doc[i]['create_date'] for i in range(len(doc))]
    end_date = [doc[i]['end_date'] for i in range(len(doc))]
    topic_start_time = [doc[i]['topic_start_time'] for i in range(len(doc))]
    title = [doc[i]['title'] for i in range(len(doc))]
    direction = [doc[i]['direction'] for i in range(len(doc))]
    notes = [doc[i]['notes'] for i in range(len(doc))]
    _type_ = [doc[i]['_type_'] for i in range(len(doc))]
    reason = [doc[i]['reason'] for i in range(len(doc))]
    result = [doc[i]['result'] for i in range(len(doc))] # 2
    msisdn = [doc[i]['msisdn'] for i in range(len(doc))]
    price_plan = [doc[i]['x_price_plan'] for i in range(len(doc))]
    cust_type = [doc[i]['cust_type'] for i in range(len(doc))] # 3
    credit_limit = [doc[i]['credit_limit'] for i in range(len(doc))] # 4
    unit = [doc[i]['unit'] for i in range(len(doc))]
    supervisor = [doc[i]['supervisor'] for i in range(len(doc))]
    sdc = [doc[i]['sdc'] for i in range(len(doc))] # 5
    dealer_id = [doc[i]['dealer_id'] for i in range(len(doc))] # 6
    year = [doc[i]['year'] for i in range(len(doc))]
    month = [doc[i]['month'] for i in range(len(doc))]
    subs_no = [doc[i]['subs_no'] for i in range(len(doc))] # 7
    cust_bill_cycle = [doc[i]['cust_bill_cycle'] for i in range(len(doc))] # 8

    row = (date_id,interact_id,case_id,topic_id,create_date,end_date,topic_start_time,title,\
            direction,notes,_type_,reason,result,msisdn,price_plan,cust_type,credit_limit,\
            unit,supervisor,sdc,dealer_id,year,month,subs_no,cust_bill_cycle)

    insert_cmd = """
                INSERT INTO table_name
                (date_id,interact_id,case_id,topic_id,create_date,end_date,topic_start_time,title,
                direction,notes,_type_,reason,result,msisdn,price_plan,cust_type,credit_limit,
                unit,supervisor,sdc,dealer_id,year,month,subs_no,cust_bill_cycle)
                VALUES
                (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
            """

    pg_hook  = PostgresHook(postgres_conn_id='postgres_default', sql=insert_cmd)
    for d in entry_data:    
        pg_hook.run(insert_cmd, parameters=row)


default_args = {
    'retries': 3,
}

with DAG (
    dag_id='final_DAG',
    schedule_interval='0 0 * * *',
    start_date= datetime(2022, 11, 30),
    catchup=False,
    default_args=default_args
) as dag:

    execute_spark = BashOperator(
        task_id='execute_spark',
        bash_command="""
        cd
        python3 path/to/spark_notebook.py
        """
    )


    load_data_task =  PythonOperator(
            task_id='load_data_task',
            provide_context=True,
            python_callable=load_data,
            dag=dag)

 
execute_spark >> load_data_task

When the load_data_task is triggered, I get this error listed in my logs:

psycopg2.errors.DatatypeMismatch: column "date_id" is of type date but expression is of type text[]

I understand what the error is saying, but don't know how to deal with it. How can I get this thing done?

CodePudding user response:

The problem statement provided has multiple issues. The statement would benefit from the addition of,

  • an example of what the json file or doc variable looks like
  • the table definition for the table_name table
  • code is missing the definition of entry_data

The following solutions applies assumptions due to the missing information mentioned and uses a limited example.


The error message appears to be saying that the date_id column in the table_name PostGRES table is of type DATE. Whereas the python variable named date_id is a list of strings (or in PostGRES terms data type text[]).

It looks like all of the python variables input into the row variable are a lists. This is not a correct format to use for the SQL insert statement.


Part 0. Assumptions

Assumption 1 - doc looks like this

[{ "day":"2022-11-30", "interact_id":"8675309", "case_id":"12345", "topic_id":"09876", "create_date":"2022-01-01", "end_date":"2022-12-05" }, { "day":"2022-11-29", "interact_id":"8675307", "case_id":"12344", "topic_id":"08888", "create_date":"2022-02-02", "end_date":"2023-01-05" }]

Assumption 2 - table_name column data types are the following

table_name column_name data_type
table_name date_id DATE
table_name interact_id TEXT
table_name case_id TEXT
table_name topic_id TEXT
table_name create_date TEXT
table_name end_date TEXT

Look this up for your table using the following command,

SELECT 
   table_name, 
   column_name, 
   data_type 
FROM 
   information_schema.columns
WHERE 
   table_name = 'table_name';

Part 1. Get rid of the python lists for each variable.

This solution loops through the json and inserts into the sql table for each item.

# esablish postgres connection
pg_hook  = PostgresHook(postgres_conn_id='postgres_default')
insert_cmd = """
            INSERT INTO table_name (date_id,interact_id,case_id,topic_id,create_date,end_date)
            VALUES(%s,%s,%s,%s,%s,%s);
            """
# load file
doc = read_json_file(filename)

# loop through items in doc
for i in range(len(doc)):
    
   date_id = i['day']
   interact_id = i['interact_id']
   case_id = i['case_id']
   topic_id = i['topic_id']
   create_date = i['create_date']
   end_date = i['end_date']
   row = (date_id,interact_id,case_id,topic_id,create_date,end_date)

   # insert item to table
   pg_hook.run(insert_cmd, parameters=row)

Part 2. Ensure each variable matches the data type that PostGRES expects

The PostGRES DATE type format accepts many different input types: https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-DATE-TABLE yyyy-mm-dd is the recommended DATE format. So we will continue this solution with the assumption that is the format used by the table_name table

To fix the error, the python date_id variable will need to be reformatted to a python datetime data type using the python datetime library. The python datetime format definition '%Y/%m/%d' defines the yyyy-mm-dd datetime format

instead of this

date_id = i['day']

use this to convert the string to a datetime type

date_id = datetime.strptime(i['day'], '%Y/%m/%d')

more about datetime.strptime function here: https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior

  • Related