I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create). What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)
I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.
Please :
- I would like to know if my code is the reason why my script hangs.
- Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?
import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)
#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
# logic for random password gen
# the most important funtion
#
def createAccounts(splitData, threadID):
batchProgress = 0
batch_size = splitData.shape[0]
for row in splitData.itertuples():
try:
headers = {"Content-Type": "application/json", "Authorization":"Bearer " access_token}
randomLength = [8,9,12,13,16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
batchProgress =1
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile" : {
"password": password,
"forceChangePasswordNextSignIn": False
},
"state":"",
"identities": [
{
"signInType": "emailAddress",
"issuer": tenantName,
"issuerAssignedId": row[2]
}
]
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if(len(row[4])):
post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]})
responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
status = responseFromApi.status_code
if(status == 201): #success
id = responseFromApi.json().get("id")
print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
elif(status == 401): #token expiry
print(f" Thread {threadID} | Token Expired. Getting it back !")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
getRefreshToken()
else: #any other error
msg = ""
try:
msg = responseFromApi.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
logger.error(errorDict)
except Exception as e:
# check for refresh token errors
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
logger.error(errorDict)
msg = " Error "
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")
batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
# get file name and appid from command line arguments
storageFileName = sys.argv[1]
appId = sys.argv[2]
# setup credentials
bigFilePath = f"./{storageFileName}"
CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
B2C_Tenant_Name = "tenantName"
tenantName = B2C_Tenant_Name ".onmicrosoft.com"
applicationID = appId
accessSecret = "" # will be taken from command line in future revisions
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id" : applicationID,
"client_secret": accessSecret
}
# Get initial access token from MS
print("Connecting to MS Graph API")
token_api = "https://login.microsoftonline.com/" tenantName "/oauth2/v2.0/token"
response = {}
try:
responseFromApi = requests.post(token_api, data=token_api_body)
responseJson = responseFromApi.json()
print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
except Exception as e:
print("ERROR | Token auth failed ")
# if we get the token proceed else abort
if(responseFromApi.status_code == 200):
migrationData = pd.read_csv(bigFilePath)
print(" We got the data from Storage !", migrationData.shape[0])
global access_token
access_token = responseJson.get('access_token')
graph_api_create = "https://graph.microsoft.com/v1.0/users"
dataSetSize = migrationData.shape[0]
partitions = 50 # No of partitions # will be taken from command line in future revisions
size = int(dataSetSize/partitions) # No of rows per file
remainder = dataSetSize%partitions
print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} | Remainder: {remainder} | Start...... \n")
##### ------- Dataset partioning.
datasets = []
range_val = partitions 1 if remainder !=0 else partitions
for partition in range(range_val):
if(partition == partitions):
df = migrationData[size*partition:dataSetSize]
else:
df = migrationData[size*partition:size*(partition 1)]
datasets.append(df)
number_of_threads = len(datasets)
start_time = time.time()
spawned_threads = []
######## ---- Threads are spawned ! here --------
for i in range(number_of_threads): # spawn threads
t = threading.Thread(target=createAccounts, args=(datasets[i], i))
t.start()
spawned_threads.append(t)
number_spawned = len(spawned_threads)
print(f"Started {number_spawned} threads !")
###### - Threads are killed here ! ---------
for thread in spawned_threads: # let the script wait for thread execution
thread.join()
print(f"Done! It took {time.time() - start_time}s to execute") # time check
#### ------ Retry Mechanism -----
print("RETRYING....... !")
os.system(f'python3 retry.py pylogs.log {appId}')
else:
print(f"Token Missing ! API response {responseJson}")```
CodePudding user response:
un-fair use of MS Graph
Due to possible throttling by the server, the usage of the MS Graph resource might be un-fair between threads. I use fair in the resource starvation sense.
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
One thread making a million calls can get a disproportionate amount of 429
responses each followed by a penalty of 150 seconds. This sleep doesn't stop the other threads from making calls though and achieving forward progress.
This would result in one thread lagging far behind the others and giving the appearance of being stuck.
CodePudding user response:
Here's a refactoring of your code to use the standard library multiprocessing.ThreadPool
for simplicity.
Naturally I couldn't have tested it since I don't have your data, but the basic idea should work. I removed the logging and retry stuff, since I really couldn't understand why you'd need it (but feel free to add it back); this will attempt to retry each row if the problem appears to be transient.
import random
import sys
import time
from multiprocessing.pool import ThreadPool
import pandas as pd
import requests
sess = requests.Session()
# globals filled in by `main`
tenantName = None
access_token = None
def submit_user_create(row):
headers = {"Content-Type": "application/json", "Authorization": "Bearer " access_token}
randomLength = [8, 9, 12, 13, 16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile": {"password": password, "forceChangePasswordNextSignIn": False},
"state": "",
"identities": [{"signInType": "emailAddress", "issuer": tenantName, "issuerAssignedId": row[2]}],
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if len(row[4]):
post_request_body["identities"].append({"signInType": "phoneNumber", "issuer": tenantName, "issuerAssignedId": row[1]})
return sess.post("https://graph.microsoft.com/v1.0/users", headers=headers, json=post_request_body)
def get_access_token(tenantName, applicationID, accessSecret):
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id": applicationID,
"client_secret": accessSecret,
}
token_api = f"https://login.microsoftonline.com/{tenantName}/oauth2/v2.0/token"
resp = sess.post(token_api, data=token_api_body)
if resp.status_code != 200:
raise RuntimeError(f"Token Missing ! API response {resp.content}")
json = resp.json()
print(f"Token API Success ! Expires in {json.get('expires_in')} seconds")
return json["access_token"]
def process_row(row):
while True:
response = submit_user_create(row)
status = response.status_code
if status == 201: # success
id = response.json().get("id")
print(f"Success {id}")
return True
if status == 429: # throttling issues
print(f"Throttled by server ! Sleeping for 150 seconds")
time.sleep(150)
continue
if status == 401: # token expiry?
print(f"Token Expired. Getting it back !")
getRefreshToken() # TODO
continue
try:
msg = response.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
print(f" {status} | {msg} {row[2]}")
return False
def main():
global tenantName, access_token
# get file name and appid from command line arguments
bigFilePath = sys.argv[1]
appId = sys.argv[2]
# setup credentials
B2C_Tenant_Name = "tenantName"
tenantName = f"{B2C_Tenant_Name}.onmicrosoft.com"
accessSecret = "" # will be taken from command line in future revisions
access_token = get_access_token(tenantName, appId, accessSecret)
migrationData = pd.read_csv(bigFilePath)
start_time = time.time()
with ThreadPool(10) as pool:
for i, result in enumerate(pool.imap_unordered(process_row, migrationData.itertuples()), 1):
progress = i / len(migrationData) * 100
print(f"{i} / {len(migrationData)} | {progress:.2f}% | {time.time() - start_time:.2f} seconds")
print(f"Done! It took {time.time() - start_time}s to execute")
if __name__ == "__main__":
main()