Hi i am running one use case of Python Snowpark , it is specific to running lots of data loads in parallel. I have close to 42 tables which are loaded from SnowFlake.AccountUsage Views , The data into these tables are independent of each other so i tried to use Snowpark python library to do parallel data load using multiprocess package library. However when i ran this code from my laptop the code ran successfully but same code failed in Snowflake machine ; looks like Snowflake has designed its warehouses to not allow to fork/spawn the process for parallel processing. Do you have any thoughts on how to do the parallel data loading without using multiprocess package.
This is architectural level question, I request you not to close this query without inputs from architects.
The Error Message I am getting it
"/usr/lib/python_udf/1439992e4e54a095348cc1d96f9448a9579f940638c334772ebfeb71ef5b03e0/lib/python3.8/site-packages/multiprocess/popen_fork.py", line 70, in _launch self.pid = os.fork() PermissionError: [Errno 1] Operation not permitted in function TEST_SNOWPARK with handler run
from multiprocess import Process
from snowflake.snowpark import Session
TransactionDataLoadSqlList=["""INSERT OVERWRITE INTO DB.SCHEMA.T_STAGES SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.STAGES" """]
# There are mutiple insert statements like this but for ex i have taken here only one
main()
processes=[]
try:
print(" Multi-processing started")
for TSqls in TransactionDataLoadSqlList:
# print(TSqls)
p=Process(target=RunAtSnowflake,args=[Session,TSqls])
p.start()
processes.append(p)
for process in processes:
process.join()
print(" Multi-processing finished")
except BaseException as err:
print(f"Unexpected {err=}, {type(err)=}")
raise
def RunAtSnowflake(Session,Query):
Session.sql(Query).collect()
return "SUCCESS"
if __name__ == '__main__':
main(Session)
CodePudding user response:
This is about running async queries.
Outside Snowpark, with the Snowflake Python connectors you can do this:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Inside Snowpark, as a stored procedure: Currently this is not supported, but stay tuned - as it's a feature that the eng team would like to implement.
As an alternative, I tried Scala with Snowpark:
// https://docs.snowflake.com/en/sql-reference/stored-procedures-scala.html
import com.snowflake.snowpark._
object Procedure {
def main(session: Session): String = {
var rows = session.sql("select 'Hello, Scala!'").async.collect();
return rows.getResult()(0).getString(0);
}
}
That one throws an exception "Async execution not supported in current context" - which is another way of saying, we'll need to wait to get async queries inside a stored procedure.