I was trying to read from a table in snowflake and manipulate data and trying to write back ! I was able to connect to snow flake , read data as dataframe but cannot write back to the table
code to connect to snowflake
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
## @params: [JOB_NAME, URL, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD'])
#sc = SparkContext()
sc=SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
try:
job.init(args['JOB_NAME'], args)
except Exception as e:
pass
java_import(spark._jvm, SNOWFLAKE_SOURCE_NAME)
## uj = sc._jvm.net.snowflake.spark.snowflake
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
sfOptions = {
"sfURL" : args['URL'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfWarehouse" : args['WAREHOUSE'],
"sfRole" : args['ROLE']
}
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "<>").load().select('<>')
print(df.printSchema())
print(df.show())
df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "GLUE_DEMO").mode("append").save()
But when executing getting below error
File "/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o81.save.
: java.sql.SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Results not generated.
at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:127)
at net.snowflake.client.jdbc.SFAsyncResultSet.getMetaData(SFAsyncResultSet.java:262)
If a see the history in snowflake it's showing warehouse not selected
No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command
CodePudding user response:
The easiest way is to assign the default warehouse to the user:
ALTER USER <name> SET DEFAULT_WAREHOUSE = <string>
Reference: ALTER USER
CodePudding user response:
The read worked, if the data was already cached and hence does not require an active warehouse.