Home > OS >  Python | Multithreading script does not complete
Python | Multithreading script does not complete

Time:08-29

I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create). What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)

I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.

Please :

  1. I would like to know if my code is the reason why my script hangs.
  2. Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?
import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)

#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
    # logic for random password gen

# the most important funtion
#

def createAccounts(splitData, threadID):
            batchProgress = 0
            batch_size = splitData.shape[0]
            for  row in splitData.itertuples():
                try:
                    headers = {"Content-Type": "application/json", "Authorization":"Bearer " access_token}  
                    randomLength = [8,9,12,13,16] 
                    passwordLength = random.choice(randomLength) 
                    password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
                    batchProgress =1
                    post_request_body = {
                                "accountEnabled": True,
                                "displayName": row[5],
                                "givenName": row[3],
                                "surname": row[4],
                                "mobilePhone": row[1],
                                "mail": row[2],
                                "passwordProfile" : {
                                    "password": password,
                                    "forceChangePasswordNextSignIn": False
                                },
                                "state":"",
                                "identities": [
                                    {
                                        "signInType": "emailAddress",
                                        "issuer": tenantName,
                                        "issuerAssignedId": row[2]
                                    }
                                ]
                            }
                    # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty       
                    if(len(row[4])):
                        post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]}) 
                    responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
                    status = responseFromApi.status_code
                    if(status == 201): #success
                        id =  responseFromApi.json().get("id")
                        print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
                        errorDict =  f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
                    elif(status == 429): #throttling issues
                        print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
                        time.sleep(150)  
                    elif(status == 401): #token expiry
                        print(f"  Thread {threadID} | Token Expired. Getting it back !")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
                        getRefreshToken()      
                    else:   #any other error
                        msg = ""
                        try:
                            msg = responseFromApi.json().get("error").get("message")
                        except Exception as e:
                            msg = f"Error {e}"
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
                        print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}")
                    logger.error(errorDict)    
                except Exception as e:
                    # check for refresh token errors
                    errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
                    logger.error(errorDict)
                    msg = " Error "
                    print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}") 
            print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")         
            batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
    # get file name and appid from command line arguments
    storageFileName = sys.argv[1]
    appId = sys.argv[2]
    # setup credentials
    bigFilePath = f"./{storageFileName}"
    CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
    B2C_Tenant_Name = "tenantName"
    tenantName =  B2C_Tenant_Name   ".onmicrosoft.com"
    applicationID = appId
    accessSecret = "" # will be taken from command line in future revisions
    token_api_body = {    
        "grant_type": "client_credentials",
        "scope": "https://graph.microsoft.com/.default",
        "client_Id" : applicationID,
        "client_secret": accessSecret
    }
    # Get initial access token from MS
    print("Connecting to MS Graph API")
    token_api = "https://login.microsoftonline.com/" tenantName "/oauth2/v2.0/token"
    response = {}
    try:
        responseFromApi = requests.post(token_api, data=token_api_body)
        responseJson = responseFromApi.json()
        print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
    except Exception as e:
        print("ERROR | Token auth failed ")
    # if we get the token proceed else abort
    if(responseFromApi.status_code == 200):
        migrationData = pd.read_csv(bigFilePath)
        print(" We got the data from Storage !", migrationData.shape[0])

        global access_token
        access_token = responseJson.get('access_token')
        graph_api_create = "https://graph.microsoft.com/v1.0/users"
        dataSetSize = migrationData.shape[0]

        partitions = 50 # No of partitions # will be taken from command line in future revisions
        size = int(dataSetSize/partitions) # No of rows per file
        remainder = dataSetSize%partitions
        print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} |  Remainder: {remainder} | Start...... \n")
        ##### ------- Dataset partioning. 
        datasets = []
        range_val = partitions   1 if remainder !=0 else partitions
        for partition in range(range_val):
            if(partition == partitions):
                df = migrationData[size*partition:dataSetSize]
            else:
                df = migrationData[size*partition:size*(partition 1)]
            datasets.append(df)
        number_of_threads = len(datasets)
        start_time = time.time()    
        spawned_threads = []
######## ---- Threads are spawned ! here --------
        for i in range(number_of_threads): # spawn threads
            t = threading.Thread(target=createAccounts, args=(datasets[i], i))
            t.start()
            spawned_threads.append(t)
        number_spawned = len(spawned_threads)  
        print(f"Started {number_spawned} threads !")   
       ###### - Threads are killed here ! ---------  
        for thread in spawned_threads: # let the script wait for thread execution
            thread.join()
        print(f"Done! It took {time.time() - start_time}s to execute")  # time check
    #### ------ Retry Mechanism -----
        print("RETRYING....... !")
        os.system(f'python3 retry.py pylogs.log {appId}')
    else:
        print(f"Token Missing ! API response {responseJson}")```

CodePudding user response:

un-fair use of MS Graph

Due to possible throttling by the server, the usage of the MS Graph resource might be un-fair between threads. I use fair in the resource starvation sense.

elif(status == 429): #throttling issues
    print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
    errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
    time.sleep(150)

One thread making a million calls can get a disproportionate amount of 429 responses each followed by a penalty of 150 seconds. This sleep doesn't stop the other threads from making calls though and achieving forward progress.

This would result in one thread lagging far behind the others and giving the appearance of being stuck.

CodePudding user response:

Here's a refactoring of your code to use the standard library multiprocessing.ThreadPool for simplicity.

Naturally I couldn't have tested it since I don't have your data, but the basic idea should work. I removed the logging and retry stuff, since I really couldn't understand why you'd need it (but feel free to add it back); this will attempt to retry each row if the problem appears to be transient.

import random
import sys
import time
from multiprocessing.pool import ThreadPool

import pandas as pd
import requests

sess = requests.Session()

# globals filled in by `main`
tenantName = None
access_token = None



def submit_user_create(row):
    headers = {"Content-Type": "application/json", "Authorization": "Bearer "   access_token}
    randomLength = [8, 9, 12, 13, 16]
    passwordLength = random.choice(randomLength)
    password = generateRandomPassword(passwordLength)  # will be generated randomly - for debugging purpose
    post_request_body = {
        "accountEnabled": True,
        "displayName": row[5],
        "givenName": row[3],
        "surname": row[4],
        "mobilePhone": row[1],
        "mail": row[2],
        "passwordProfile": {"password": password, "forceChangePasswordNextSignIn": False},
        "state": "",
        "identities": [{"signInType": "emailAddress", "issuer": tenantName, "issuerAssignedId": row[2]}],
    }
    # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
    if len(row[4]):
        post_request_body["identities"].append({"signInType": "phoneNumber", "issuer": tenantName, "issuerAssignedId": row[1]})
    return sess.post("https://graph.microsoft.com/v1.0/users", headers=headers, json=post_request_body)


def get_access_token(tenantName, applicationID, accessSecret):
    token_api_body = {
        "grant_type": "client_credentials",
        "scope": "https://graph.microsoft.com/.default",
        "client_Id": applicationID,
        "client_secret": accessSecret,
    }
    token_api = f"https://login.microsoftonline.com/{tenantName}/oauth2/v2.0/token"
    resp = sess.post(token_api, data=token_api_body)
    if resp.status_code != 200:
        raise RuntimeError(f"Token Missing ! API response {resp.content}")
    json = resp.json()
    print(f"Token API Success ! Expires in {json.get('expires_in')} seconds")
    return json["access_token"]


def process_row(row):
    while True:
        response = submit_user_create(row)
        status = response.status_code

        if status == 201:  # success
            id = response.json().get("id")
            print(f"Success {id}")
            return True

        if status == 429:  # throttling issues
            print(f"Throttled by server ! Sleeping for 150 seconds")
            time.sleep(150)
            continue

        if status == 401:  # token expiry?
            print(f"Token Expired. Getting it back !")
            getRefreshToken()  # TODO
            continue

        try:
            msg = response.json().get("error").get("message")
        except Exception as e:
            msg = f"Error {e}"
        print(f" {status} | {msg}  {row[2]}")
        return False


def main():
    global tenantName, access_token
    # get file name and appid from command line arguments
    bigFilePath = sys.argv[1]
    appId = sys.argv[2]
    # setup credentials
    B2C_Tenant_Name = "tenantName"
    tenantName = f"{B2C_Tenant_Name}.onmicrosoft.com"
    accessSecret = ""  # will be taken from command line in future revisions
    access_token = get_access_token(tenantName, appId, accessSecret)
    migrationData = pd.read_csv(bigFilePath)
    start_time = time.time()
    with ThreadPool(10) as pool:
        for i, result in enumerate(pool.imap_unordered(process_row, migrationData.itertuples()), 1):
            progress = i / len(migrationData) * 100
            print(f"{i} / {len(migrationData)} | {progress:.2f}% | {time.time() - start_time:.2f} seconds")

    print(f"Done! It took {time.time() - start_time}s to execute")


if __name__ == "__main__":
    main()
  • Related