Home > Mobile >  How to share datafram from multiprocess to main process?
How to share datafram from multiprocess to main process?

Time:04-27

I'm going to get some dataframe which downloaded from SQL server. I tried to add queue, but seems it takes too long time to "put" and "get" data with queue, coding as below.

    def data(q):
        conn = cx_Oracle.connect("*", "*","*", encoding="UTF-8")
        cursor = conn.cursor()
        sql = cursor.execute("select * from * I where I.DATE between sysdate - 4 and sysdate - 3")
        table = cursor.fetchall()
        col = [x[0] for x in cursor.description] 
        df_I = pd.DataFrame(table, columns=col)
        q.put(df_IQDC2)
    
    if __name__ == '__main__':
    
        q = multiprocessing.Queue()
        one_process = multiprocessing.Process(target=data, args=(q,))
    
        one_process.start()
        one_process.join()
    
        df_I = q.get()

data(q) function used to put dataframe into queue, and try to get dataframe from q in parent process, but failed--it takes too long time. Do you have any good idea? thank you in advance.

CodePudding user response:

Your example, which I presume (at least I hope) is just a simplification of what you are really want to accomplish, as presented cannot achieve any performance improvement because you are not effectively doing any multiprocessing. This is because although your main process creates a subprocess, it then immediately blocks until that subprocess finishes and returns its result and thus there is no parallel computing occurring. All you have accomplished is to add additional overhead by creating a new process and having to transmit data from one address space to another.

So a real use case for multiprocessing/multithreading would, for example, involve the situation where you had to do multiple queries with each one needing to produce a dataframe from the results. If we were to analyze what is involved in that situation there is the query itself followed by the creation of the dataframe. Since there is the aforementioned overhead in creating new processes and returning data across processes' address spaces, my approach would be to use multithreading for the query, which should work nicely since the issues with competition between threads for the Global Interpreter Lock should not apply since the query task will mostly be in a network wait state. You could then return the results of the query back to the main thread (there is now no inter-process transfer required) and have the main thread do the CPU work of creating the dataframes from the returned data or more simply have the same worker function build the dataframe from the results and return the completed dataframe. In the latter case, depending on whether pandas releases the Global Interpreter Lock or not, you may not achieve any level of parallel processing in building the dataframes, but you will not be doing any worse than having the main process build the dataframes.

In the worst-case scenario where pandas does not release the Global Interpreter Lock, it is true that you will not be doing the more CPU-intensive work, i.e. the creation of the dataframes, in parallel for which multiprocessing would suggest itself as the solution. But the overhead of multiprocessing only becomes worthwhile when the amount of CPU processing for each task is great enough that what is gained by parallel computing offsets what is lost by the additional overhead. I don't believe that would be the case in this particular situation.

So the following is example code using a multithreading pool for the case where you had two queries to create two dataframes:

from multiprocessing.pool import ThreadPool

def do_query(query):
    conn = cx_Oracle.connect("*", "*","*", encoding="UTF-8")
    cursor = conn.cursor()
    sql = cursor.execute(query)
    table = cursor.fetchall()
    col = [x[0] for x in cursor.description] 
    return pd.DataFrame(table, columns=col)
   
if __name__ == '__main__':
    queries = [
        "select * from I where I.DATE between sysdate - 4 and sysdate - 3",
        "select * from I where I.DATE between sysdate - 5 and sysdate - 4"
    ]
    pool = ThreadPool(len(queries))
    dataframes = pool.map(do_query, queries)
    pool.close()
    pool.join()

It's still not clear whether the above would increase performance over serial code running two successive queries; it would depend on the complexity (running time) of the queries themselves.

  • Related