I'm launching AWS EMR jobs via Airflow which rely on saving the data onto a PostgreSQL database. Unfortunately, as far as I can tell, the connector is not available by default in EMR hence the error:
Traceback (most recent call last):
File "my_emr_script.py", line 725, in <module>
main()
File "my_emr_script.py", line 718, in main
.mode("overwrite") \
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1493.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
How can I ensure that when EMR starts it contains the PostgreSQL connector? I searched for ways to do it via bootstrapping but I failed to find an answer; all the official documents refer only the Presto version.
Edit:
I followed @Emerson's suggestion and downloaded the .JAR onto an S3 folder and passed it via configs directly in Airflow JOB_FLOW_OVERRIDES:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_folder }}/scripts/postgresql-42.2.5.jar",
},
}
],
In Airflow:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/el_emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.jar": "s3://{{ var.value.s3_path }}/scripts/postgresql-42.2.5.jar"
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
Unfortunately, the problem remains.
In addition, I tried modifying the bootstrap to download the .JAR:
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
And pass that onto the configuration:
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
],
In airflow:
instance_type: str = 'm5.xlarge'
SPARK_STEPS = [
{
'Name': 'emr_test',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
"Args": [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
"s3://{{ var.value.s3_folder }}/scripts/emr.py",
'--execution_date',
'{{ ds }}'
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'EMR Test',
"ReleaseLabel": "emr-6.4.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': instance_type,
'InstanceCount': 1,
},
{
"Name": "Core",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": instance_type,
"InstanceCount": 1,
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.s3_folder }}/scripts/emr_bootstrap.sh',
}
},
],
'LogUri': 's3://{{ var.value.s3_folder }}/logs',
"Configurations": [
{
"Classification": "spark-defaults",
"Properties":
{
"spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
"spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
},
}
]
}
emr_creator = EmrCreateJobFlowOperator(
task_id='create_emr',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_conn',
emr_conn_id='emr_conn',
region_name='us-west-2',
)
This in turn, results in a new error which somehow makes Spark not able to read the JSON files, treating them as corrupt files.
root
|-- _corrupt_record: string (nullable = true)
Lastly, the common emr_boostrap.sh
:
#!/bin/bash -xe
sudo pip3 install -U \
boto3 \
typing
cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
CodePudding user response:
Im not sure how the emr is being provisioned but below is how you would do it.
First upload the postgres jdbc jar to an s3 location. THen refer to that when you provision the cluster.
If you are provisioning via Cloudformation then below is what you will need to do
EMR:
Type: AWS::EMR::Cluster
Properties:
Applications:
- Name: Spark
Configurations:
- Classification: spark-defaults
ConfigurationProperties:
spark.jars: s3://path_to_jar/postgresql-42.2.11.jar
If its cli commands, then it would be something like below
aws emr create-cluster ...... --configurations config.json
where config.json may look like below
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.jars": "s3://path_to_jar/postgresql-42.2.11.jar"
}
}
]
EDIT:
After seeing ur edited question , I can see ur spark submit args( SPARKSTEP variable). In that section just add two more items like below
‘—jars’
‘s3://pathtodriver/postgresdriver.jar’