Home > database >  Python: How to implement concurrent futures to a function
Python: How to implement concurrent futures to a function

Time:11-17

I was wonder what would be a good way to implement Concurrent Futures to iterate through a large list of stocks for New Program.

On my previous program, I tried using concurrent futures but when printing the data it was not consistent. For example when running a large list of stocks, it will give different information each time(As you can see for Output 1 and 2 for the previous program). I wanted to provide my previous program to see what I did wrong with implementing concurrent futures.

Thanks!

New Program

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
def create_df(tickers):
    all_info = []
    for ticker in tickers:
        all_info.append(yf.Ticker(ticker).info)
        
    df = pd.DataFrame.from_records(all_info)
    df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
    df.dropna(inplace=True)
    # This is where you can add calculations and other columns not in Yfinance Library
    df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
    return df

df = create_df(tickers)
print(df)
print('It took', time.time()-start, 'seconds.')

Output

   symbol        ebitda  enterpriseValue  trailingPE              sector   EV/Ratio
0       A  1.762000e 09     5.311271e 10   60.754720          Healthcare  30.143422
9    AAMC -2.015600e 07     1.971329e 08    1.013164  Financial Services  -9.780359
10   AAME  2.305600e 07     1.175756e 08    7.652329  Financial Services   5.099566
11    AAN  8.132960e 08     1.228469e 09    9.329710   Consumer Cyclical   1.510483
13   AAON  1.178790e 08     3.501286e 09   55.615944         Industrials  29.702376
14    AAP  1.239876e 09     1.609877e 10   25.986680   Consumer Cyclical  12.984181
15   AAPL  1.109350e 11     2.489513e 12   33.715443          Technology  22.441190
It took 101.81006002426147 seconds.

Previous Program For Reference

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
start = time.time()

col_a = []  
col_b = []  
col_c = []  
col_d = []  

print('Lodaing Data... Please wait for results')


def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        None
    col_a.append(a)  
    col_b.append(b)  
    col_c.append(c)  
    col_d.append(d)     
    return
with concurrent.futures.ThreadPoolExecutor() as executer:
    executer.map(do_something, tickers)
        

# Dataframe Set Up
pd.set_option("display.max_rows", None)
   
df = pd.DataFrame({
    'Ticker': tickers,
    'Ebitda': col_a,  
    'EnterpriseValue' :col_b,  
    'PE Ratio': col_c,  
    'Sector': col_d,
})
print(df.dropna())
print(len('Total Companies with Information'))
print('It took', time.time()-start, 'seconds.')

Output 1 for Previous Program

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
1      AA  1.651000e 09     5.031802e 10  49.183292          Healthcare
3    AACG  2.216000e 09     1.168140e 10  11.711775     Basic Materials
5    AADI  1.928800e 07     1.108360e 08   6.954397  Financial Services
7    AAIN  1.128370e 08     3.960835e 09  57.706764         Industrials
8     AAL  8.303301e 08     1.103969e 09   9.111819   Consumer Cyclical
10   AAME  1.202330e 11     2.534678e 12  26.737967          Technology
12   AAOI -1.848400e 07     1.277540e 08   0.355233  Financial Services
14    AAP  1.224954e 09     1.770882e 10  26.059464   Consumer Cyclical
32
It took 4.2548089027404785 seconds.

Output 2 for Previous Program

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
0       A -1.848400e 07     1.277540e 08   0.355233  Financial Services
4   AACIU  1.202330e 11     2.534678e 12  26.737967          Technology
5    AADI  1.651000e 09     5.031802e 10  49.183292          Healthcare
7    AAIN  1.128370e 08     3.960835e 09  57.706764         Industrials
9    AAMC  8.303301e 08     1.103969e 09   9.111819   Consumer Cyclical
10   AAME  2.216000e 09     1.168140e 10  11.711775     Basic Materials
13   AAON  1.224954e 09     1.770882e 10  26.059464   Consumer Cyclical
14    AAP  1.928800e 07     1.108360e 08   6.954397  Financial Services
32
It took 4.003742933273315 seconds.

CodePudding user response:

You have a multithreaded program. The function ThreadPoolExecutor.map launches a number of threads that will run concurrently. Each thread consists of one call to do_something(), but you do not have any control over the order in which these threads execute or finish. The problem occurs because you append the results (a, b, c, d) to the individual lists col_a, col_b etc. inside do_something. Those lists are global, so the data gets appended to them in more-or-less random order. It is even possible that a thread switch occurs right in the middle of the four calls to append(). So the order of the data will be random, and the individual rows might be messed up.

The list of ticker symbols is added to the dataframe in the main thread. So the list of symbols and the data itself are not synchronized. That's exactly what you observe.

The easiest solution is to set up all your data structures in the main thread. This is easy to do because the function map() returns an iterator, and the order of iteration is guaranteed to be preserved. The iterator steps over the values returned by do_something(). So instead of trying to update the lists col_a, col_b, etc. in that function, just return the values a, b, c, d as a tuple. Back in your main thread, you take these values and append them to the columns.

The order of execution of the different threads is not controlled, but map() sorts it out for you; it collects all the results first, and then steps through them in order.

Change this part of your program - everything else can stay the same.

def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        return None, None, None, None  # must return a 4-tuple
    return a, b, c, d

with concurrent.futures.ThreadPoolExecutor() as executer:
    for a, b, c, d in executer.map(do_something, tickers):
        col_a.append(a)  
        col_b.append(b)  
        col_c.append(c)  
        col_d.append(d)     

CodePudding user response:

Here is the answer on how to implement multithreading to New Function provided by @iudeen

import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
import time
from stocks import tickers
start = time.time()



print('Lodaing Data... Please wait for results')
all_info = []
def create_df(ticker):
    all_info.append(yf.Ticker(ticker).info)
    
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(create_df, x) for x in tickers]

df = pd.DataFrame.from_records(all_info)
df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
df.dropna(inplace=True)
df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
print(df)
print('It took', time.time()-start, 'seconds.')
  • Related