I have a for loop that should update a pandas data frame from a postgres table that is updated by another thread every 5 seconds.
if I run the code without the for loop I get what I want which is just the latest update time. However, if I run the code by using the for loop the results do not update and remain stuck on the first risults.
Why is this happening and how can I fix the problem?
metadata = MetaData(bind=None)
table = Table(
'datastore',
metadata,
autoload=True,
autoload_with=engine
)
stmt = select([
table.columns.date,
table.columns.open,
table.columns.high,
table.columns.low,
table.columns.close
]).where(and_(table.columns.date_ == datetime.today().strftime('%Y-%m-%d') and table.columns.close != 0))
#]).where(and_(table.columns.date_ == '2023-01-12' and table.columns.close != 0))
connection = engine.connect()
for x in range(1000000):
data_from_db = pd.DataFrame(connection.execute(stmt).fetchall())
data_from_db = data_from_db[data_from_db['close'] != 0]
print(data_from_db.date.iloc[-1])
time.sleep(5)
I'm also trying the psycopg2 library and the problem is always there:
for x in range(1000000):
conn = psycopg2.connect(
host='localhost',
database='ABC',
user='postgres',
password='*******')
cur = conn.cursor()
cur.execute("select max(date) from public.datastore")
y = cur.fetchall()
print(y)
time.sleep(5)
CodePudding user response:
The issue can be caused by some of the following:
- transaction isolation level (meaning that your other thread which updates your table may have a session not yet closed so that you current script reads an old data)
- you may have caching applied for same statements
For the 1st factor try to set isolation_level="READ UNCOMMITTED"
to have a "dirty"/fresh reads
engine = create_engine(
"your_dsn path",
isolation_level="READ UNCOMMITTED"
)
with engine.connect() as conn:
for x in range(1000000):
data_from_db = pd.DataFrame(conn.execute(stmt).fetchall())
print(data_from_db.date.iloc[-1])
time.sleep(5)
For the 2nd factor you can try:
# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
# your loop here
data_from_db = pd.DataFrame(conn.execute(stmt).fetchall())
CodePudding user response:
I was using Jupyter notebooks.
One cell was inserting data from an API and another cell was reading from the same db using a different thread.
It looks like that if I perform these two operations using two different notebooks all works.
I was simply supposed to use threads.
import psycopg2
import time
def function_test():
while True:
while True:
conn_ = psycopg2.connect(
host='#####',
database='####',
user='####',
password='#####')
cur = conn_.cursor()
cur.execute("SELECT date FROM public.datastore order by date desc limit 1")
query_results = cur.fetchone()
print(query_results)
del(query_results)
del(cur)
break
conn_.commit()
conn_.close()
time.sleep(5)
thread1 = Thread(target = function_test, args = ())
thread1.start()