Home > Back-end >  Unable to spawn Process using Anaconda multiprocess package
Unable to spawn Process using Anaconda multiprocess package

Time:08-16

Hi i am running one use case of Python Snowpark , it is specific to running lots of data loads in parallel. I have close to 42 tables which are loaded from SnowFlake.AccountUsage Views , The data into these tables are independent of each other so i tried to use Snowpark python library to do parallel data load using multiprocess package library. However when i ran this code from my laptop the code ran successfully but same code failed in Snowflake machine ; looks like Snowflake has designed its warehouses to not allow to fork/spawn the process for parallel processing. Do you have any thoughts on how to do the parallel data loading without using multiprocess package.

This is architectural level question, I request you not to close this query without inputs from architects.

The Error Message I am getting it

"/usr/lib/python_udf/1439992e4e54a095348cc1d96f9448a9579f940638c334772ebfeb71ef5b03e0/lib/python3.8/site-packages/multiprocess/popen_fork.py", line 70, in _launch self.pid = os.fork() PermissionError: [Errno 1] Operation not permitted in function TEST_SNOWPARK with handler run

from multiprocess import Process
from snowflake.snowpark import Session

TransactionDataLoadSqlList=["""INSERT OVERWRITE INTO  DB.SCHEMA.T_STAGES  SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.STAGES" """]
# There are mutiple insert statements like this but for ex i have taken here only one
    main()
         processes=[]  
            try:  
                
                print(" Multi-processing started")
    
        
                for TSqls in  TransactionDataLoadSqlList:
                    # print(TSqls)
        
                    p=Process(target=RunAtSnowflake,args=[Session,TSqls])
                    
                    p.start()
                    processes.append(p) 
                
                for process in  processes:
                    process.join()
            
                print(" Multi-processing finished")
            except BaseException as err:
                print(f"Unexpected {err=}, {type(err)=}") 
                
                raise
    
         def RunAtSnowflake(Session,Query):
            Session.sql(Query).collect()      
            return "SUCCESS" 
            


if __name__ == '__main__':  
    main(Session)
        

CodePudding user response:

This is about running async queries.

Outside Snowpark, with the Snowflake Python connectors you can do this:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

Inside Snowpark, as a stored procedure: Currently this is not supported, but stay tuned - as it's a feature that the eng team would like to implement.

As an alternative, I tried Scala with Snowpark:

// https://docs.snowflake.com/en/sql-reference/stored-procedures-scala.html

import com.snowflake.snowpark._

object Procedure {
  def main(session: Session): String = {
    var rows = session.sql("select 'Hello, Scala!'").async.collect();
    return rows.getResult()(0).getString(0);
  }
}

That one throws an exception "Async execution not supported in current context" - which is another way of saying, we'll need to wait to get async queries inside a stored procedure.

  • Related