I'd like to know how I can properly implement threading into my application and hold at bay any race conditions while not negatively impacting the scripts performance times.
I have omitted and simplified the code for easy reading, but would be similar in structure to what is seen below
run.py
from lib.DropSender import DropSender
drop_sender = DropSender( options )
drop_sender.start()
DropSender.py
from lib.Connect import Connection
import threading
import json
class DropSender:
def __init__( self , options = {} ):
self.system_online = True
# This is a Web Socket connection delivering messages
def on_message(self, message):
js = json.loads( message )
symbol = js[6]
connections = Connection( self, symbol )
connections.start()
Connect.py
import threading
import requests
import mysql.connector
from threading import Thread, Lock
class Connection( threading.Thread ):
def __init__( self , drop_sender, symbol ):
threading.Thread.__init__( self )
self.symbol = symbol
self.lock = Lock()
def run( self ):
self.users_list = self.getUsers( self.symbol )
if self.users_list["count"] > 0:
for u in self.users_list["data"]:
self.user_id = u["user_id"] # Example 1122
self.amount = u["amount"] # 923.40
t = Thread(target=self.makePurchase, args=(self.symbol, self.user_id, self.amount, ))
t.start()
# t.join()
I know that join() removes race conditions, but that again removes the performance speed of the script, with the threads waiting on each other
def getUsers():
# MYSQL Call to get list of users who like this 'symbol'
my_users_arr = { "data" : data, "count" : count }
return my_users_arr
def makePurchase( self, symbol, user_id, amount ):
# Lock it up
self.lock.acquire()
-All sorts of race conditions happening here, even with the locks acquired-
# User ID = 1122
# Amount 884.00 (1122's User ID mixing up with another users amount, race condition)
# Release Lock
self.lock.release()
CodePudding user response:
Your use of connection.user_id
and connection.amount
looks suspicious. There is only a single connection object, and these two fields are used and then immediately overwritten by setting up the next thread.
You do not show your code for makePurchase()
, but it should not expect these two fields of self
to be correct.
As an aside, I highly recommend the use of a thread pool.
from multiprocessing.pool import ThreadPool
with ThreadPool() as pool:
for ....:
pool.apply_async(func, args)
This limits the number of threads to the number of CPUs on your machine, and makes sure that all the threads are cleaned up when done.