Home > Enterprise >  For loop does not update Python Pandas data-frame when importing data using SQL Alchemy
For loop does not update Python Pandas data-frame when importing data using SQL Alchemy

Time:01-19

I have a for loop that should update a pandas data frame from a postgres table that is updated by another thread every 5 seconds.

if I run the code without the for loop I get what I want which is just the latest update time. However, if I run the code by using the for loop the results do not update and remain stuck on the first risults.

Why is this happening and how can I fix the problem?

metadata = MetaData(bind=None)
table = Table(
    'datastore', 
    metadata, 
    autoload=True, 
    autoload_with=engine
)

stmt = select([
    table.columns.date,
    table.columns.open,
    table.columns.high,
    table.columns.low,
    table.columns.close
]).where(and_(table.columns.date_ == datetime.today().strftime('%Y-%m-%d') and table.columns.close != 0))
#]).where(and_(table.columns.date_ == '2023-01-12' and table.columns.close != 0))
    
connection = engine.connect()

for x in range(1000000):
    data_from_db = pd.DataFrame(connection.execute(stmt).fetchall())  
    data_from_db = data_from_db[data_from_db['close'] != 0]
    print(data_from_db.date.iloc[-1])
    time.sleep(5)

I'm also trying the psycopg2 library and the problem is always there:

for x in range(1000000):
    conn = psycopg2.connect(
                           host='localhost', 
                           database='ABC',  
                           user='postgres',
                           password='*******')
    cur = conn.cursor()
    cur.execute("select max(date) from public.datastore")

    y = cur.fetchall()
    print(y)
    time.sleep(5)

CodePudding user response:

The issue can be caused by some of the following:

  1. transaction isolation level (meaning that your other thread which updates your table may have a session not yet closed so that you current script reads an old data)
  2. you may have caching applied for same statements

For the 1st factor try to set isolation_level="READ UNCOMMITTED" to have a "dirty"/fresh reads

engine = create_engine(
    "your_dsn path",
    isolation_level="READ UNCOMMITTED"
)

with engine.connect() as conn:
    for x in range(1000000):
        data_from_db = pd.DataFrame(conn.execute(stmt).fetchall()) 
        print(data_from_db.date.iloc[-1])
        time.sleep(5)

For the 2nd factor you can try:

# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
    # your loop here
    data_from_db = pd.DataFrame(conn.execute(stmt).fetchall())

CodePudding user response:

I was using Jupyter notebooks.

One cell was inserting data from an API and another cell was reading from the same db using a different thread.

It looks like that if I perform these two operations using two different notebooks all works.

I was simply supposed to use threads.

import psycopg2
import time

def function_test():
    while True:
        while True:
            conn_ = psycopg2.connect(
                                   host='#####', 
                                   database='####',  
                                   user='####',
                                   password='#####')
            cur = conn_.cursor()
            cur.execute("SELECT date FROM public.datastore order by date desc limit 1")
            query_results = cur.fetchone()
            print(query_results)
            del(query_results)
            del(cur)
            break
        conn_.commit()
        conn_.close()
        time.sleep(5)
 
thread1 = Thread(target = function_test, args = ())
thread1.start()
  • Related