Home > database >  wrong values from python queues while using two threads
wrong values from python queues while using two threads

Time:03-15

Here I am using two threads. One thread create a dataframe and pass it to another thread through queue. Another thread collect the dataframe from the queue and append it to a csv file. Here when I run the code ,sometimes I got wrongvalues in entire dataframe. shoudl i have to do any corrections in the code ?

Here is my code:

import threading 
from queue import Queue
import pandas as pd
import numpy as np
import time
q=Queue()
df = pd.DataFrame(columns =['1Hz','2Hz', '3Hz', '4Hz', '5Hz', '6Hz', '7Hz'])
def getpoints(q): 
    print (threading.current_thread().getName()   " starts  id - {} ".format( threading.get_ident()))
    global df
    origin=0  
    end=1
    while Active:
        def get_values_for_frequency(freq):    
            omega = 2*np.pi*freq
            t_vec = np.linspace(origin,end,num=1000)
            y = np.sin(omega*t_vec) 
            return y
        df['1Hz']=pd.Series(get_values_for_frequency(1))
        df['2Hz']=pd.Series(get_values_for_frequency(2))
        df['3Hz']=pd.Series(get_values_for_frequency(3))
        df['4Hz']=pd.Series(get_values_for_frequency(4))
        df['5Hz']=pd.Series(get_values_for_frequency(5))
        df['6Hz']=pd.Series(get_values_for_frequency(6))
        df['7Hz']=pd.Series(get_values_for_frequency(7))
        origin=end
        end=end 1
        df = df.round(decimals = 7)  
        df=(2**15)*df  
        q.put(df)   // queue sends the dataframe
        print('yes')  // print yes to acknowledge the no of times of dataframe sent
        print(df)
        time.sleep(1)
    print("thread1 ended")
def ot(q):
    global Active
    print (threading.current_thread().getName()  " starts  id - {}".format(threading.get_ident()))
    while (Active or not q.empty()):
        t2receive=q.get()    // queue receives values from thread 1 queue
        print(t2receive)
        if  Active==False and q.empty():
            break   
        time.sleep(2)  
    print("Thread2 ended")   
if "__main__"==__name__:
    print("MAIN THREAD id -", threading.get_ident())
    global Active
    Active=True
    t1=threading.Thread(target=getpoints,args=(q,))
    t2=threading.Thread(target=ot,args=(q,))
    t1.start()
    t2.start()
    print("\ncurrently active threads -", threading.enumerate(),'\n')
    input("press Enter to stop\n\n")
    Active=False
    print("Entered!\n")
    t1.join()
    t2.join()
    print("\ncurrently active threads -", threading.enumerate(),'\n')
    print("AGAIN MAIN THREAD  id - ",threading.get_ident())
    print("T1 Alive : ",t1.is_alive())
    print("T2 Alive : ",t2.is_alive())

yes
            1Hz          2Hz          3Hz          4Hz          5Hz          6Hz          7Hz
0     -0.000000    -0.000000    -0.000000    -0.000000    -0.000000    -0.000000     0.000000
1    206.091059   412.175565   618.243686   824.285594  1030.298010  1236.267827  1442.188493
2    412.175565   824.285594  1236.267827  1648.053453  2059.576934  2470.776013  2881.581875
3    618.243686  1236.267827  1853.849600  2470.776013  3086.817690  3701.764915  4315.391590
4    824.285594  1648.053453  2470.776013  3291.932262  4111.007744  4927.481446  5740.835635
..          ...          ...          ...          ...          ...          ...          ...
995 -824.285594 -1648.053453 -2470.776013 -3291.932262 -4111.007744 -4927.481446 -5740.835635
996 -618.243686 -1236.267827 -1853.849600 -2470.776013 -3086.817690 -3701.764915 -4315.391590
997 -412.175565  -824.285594 -1236.267827 -1648.053453 -2059.576934 -2470.776013 -2881.581875
998 -206.091059  -412.175565  -618.243686  -824.285594 -1030.298010 -1236.267827 -1442.188493
999   -0.000000    -0.000000    -0.000000    -0.000000    -0.000000    -0.000000    -0.000000

[1000 rows x 7 columns]

Entered!

              1Hz           2Hz           3Hz           4Hz           5Hz           6Hz           7Hz
0   -7.347881e-16 -1.469576e-15 -2.204364e-15 -2.939152e-15 -1.077937e-14 -4.408728e-15  1.961911e-15
1    6.289433e-03  1.257862e-02  1.886730e-02  2.515525e-02  3.144219e-02  3.772789e-02  4.401210e-02
2    1.257862e-02  2.515525e-02  3.772789e-02  5.029457e-02  6.285329e-02  7.540206e-02  8.793891e-02
3    1.886730e-02  3.772789e-02  5.657505e-02  7.540206e-02  9.420224e-02  1.129689e-01  1.316953e-01
4    2.515525e-02  5.029457e-02  7.540206e-02  1.004618e-01  1.254580e-01  1.503748e-01  1.751964e-01
..            ...           ...           ...           ...           ...           ...           ...
995 -2.515525e-02 -5.029457e-02 -7.540206e-02 -1.004618e-01 -1.254580e-01 -1.503748e-01 -1.751964e-01
996 -1.886730e-02 -3.772789e-02 -5.657505e-02 -7.540206e-02 -9.420224e-02 -1.129689e-01 -1.316953e-01
997 -1.257862e-02 -2.515525e-02 -3.772789e-02 -5.029457e-02 -6.285329e-02 -7.540206e-02 -8.793891e-02
998 -6.289433e-03 -1.257862e-02 -1.886730e-02 -2.515525e-02 -3.144219e-02 -3.772789e-02 -4.401210e-02
999 -9.797174e-16 -1.959435e-15 -2.939152e-15 -3.918870e-15 -4.898587e-15 -5.878305e-15 -6.858022e-15

[1000 rows x 7 columns]
thread1 ended
            1Hz          2Hz          3Hz          4Hz          5Hz          6Hz          7Hz
0     -0.000000    -0.000000    -0.000000    -0.000000    -0.000000    -0.000000     0.000000
1    206.091059   412.175565   618.243686   824.285594  1030.298010  1236.267827  1442.188493
2    412.175565   824.285594  1236.267827  1648.053453  2059.576934  2470.776013  2881.581875
3    618.243686  1236.267827  1853.849600  2470.776013  3086.817690  3701.764915  4315.391590
4    824.285594  1648.053453  2470.776013  3291.932262  4111.007744  4927.481446  5740.835635

above is the sample output where I get wrong values in the middle dataframe. Top and bottom dataframes are correct . The middle dataframe should also be like the 1 st dataframe( FUI the datas in all the dataframes are same).

Could anyone explains what i have to change the code ? Thanks in advance

OUTPUT:

def getpoints(q,l): 
    print (threading.current_thread().getName()   " starts  id - {} ".format( threading.get_ident()))
    global df
    origin=0  
    end=1
    while Active:
            def get_values_for_frequency(freq):    
                omega = 2*np.pi*freq
                t_vec = np.linspace(origin,end,num=1000)
                y = np.sin(omega*t_vec) 
                return y
            df['1Hz']=pd.Series(get_values_for_frequency(1))
            df['2Hz']=pd.Series(get_values_for_frequency(2))
            df['3Hz']=pd.Series(get_values_for_frequency(3))
            df['4Hz']=pd.Series(get_values_for_frequency(4))
            df['5Hz']=pd.Series(get_values_for_frequency(5))
            df['6Hz']=pd.Series(get_values_for_frequency(6))
            df['7Hz']=pd.Series(get_values_for_frequency(7))
            origin=end
            end=end 1
            df = df.round(decimals = 7)  
            df=(2**15)*df 
            q.put(df)
            print('yes')
            time.sleep(1)
    print("thread1 ended")
def ot(q,l):
    global Active
    print (threading.current_thread().getName()  " starts  id - {}".format(threading.get_ident()))
    while (Active or not q.empty()):
        try:
            t2receive=q.get()
            t2receive.to_csv("mycsv.csv",mode='a',index=False,header=False)
        except Queue.empty():
            break
        except Exception as err:
            pass
        time.sleep(1)
    
    print("Thread2 ended")

CodePudding user response:

After having played around with the code a bit more, I think the issue is not the increasing queue size, but the global df, which gets reused by getpoints all the time.

Try replacing q.put(df) by q.put(df.copy()), which stores a copy of the dataframe in the queue, instead of the "global" one, which might get modified when it is received.

  • Related