Home > Mobile >  AWS EMR Airflow: Postgresql Connector
AWS EMR Airflow: Postgresql Connector

Time:10-15

I'm launching AWS EMR jobs via Airflow which rely on saving the data onto a PostgreSQL database. Unfortunately, as far as I can tell, the connector is not available by default in EMR hence the error:

Traceback (most recent call last):
  File "my_emr_script.py", line 725, in <module>
    main()
  File "my_emr_script.py", line 718, in main
    .mode("overwrite") \
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1493.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

How can I ensure that when EMR starts it contains the PostgreSQL connector? I searched for ways to do it via bootstrapping but I failed to find an answer; all the official documents refer only the Presto version.

Edit:

I followed @Emerson's suggestion and downloaded the .JAR onto an S3 folder and passed it via configs directly in Airflow JOB_FLOW_OVERRIDES:

"Configurations": [
        {
            "Classification": "spark-defaults",
            "Properties":
                {
                    "spark.jar": "s3://{{ var.value.s3_folder }}/scripts/postgresql-42.2.5.jar",
                },
        }
    ],

In Airflow:

instance_type: str = 'm5.xlarge'


SPARK_STEPS = [
    {
        'Name': 'emr_test',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            "Args": [
                'spark-submit',
                '--deploy-mode',
                'cluster',
                '--master',
                'yarn',
                "s3://{{ var.value.s3_folder }}/scripts/el_emr.py",
                '--execution_date',
                '{{ ds }}'
            ],
        },
    }
]


JOB_FLOW_OVERRIDES = {
    'Name': 'EMR Test',
    "ReleaseLabel": "emr-6.4.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'SPOT',
                'InstanceRole': 'MASTER',
                'InstanceType': instance_type,
                'InstanceCount': 1,
            },
            {
                "Name": "Core",
                "Market": "SPOT",
                "InstanceRole": "CORE",
                "InstanceType": instance_type,
                "InstanceCount": 1,
            },
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    },
    'Steps': SPARK_STEPS,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'BootstrapActions': [
        {
            'Name': 'string',
            'ScriptBootstrapAction': {
                'Path': 's3://{{ var.value.s3_folder  }}/scripts/emr_bootstrap.sh',
            }
        },
    ],
    'LogUri': 's3://{{ var.value.s3_folder }}/logs',
     "Configurations": [
        {
            "Classification": "spark-defaults",
            "Properties":
                {
                    "spark.jar": "s3://{{ var.value.s3_path }}/scripts/postgresql-42.2.5.jar"
                },
        }
    ]
}


emr_creator = EmrCreateJobFlowOperator(
        task_id='create_emr',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_conn',
        emr_conn_id='emr_conn',
        region_name='us-west-2',
    )

Unfortunately, the problem remains.

In addition, I tried modifying the bootstrap to download the .JAR:

cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar

And pass that onto the configuration:

"Configurations": [
        {
            "Classification": "spark-defaults",
            "Properties":
                {
                    "spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
                    "spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
                },
        }
    ],

In airflow:

instance_type: str = 'm5.xlarge'
    
    
    SPARK_STEPS = [
        {
            'Name': 'emr_test',
            'ActionOnFailure': 'CANCEL_AND_WAIT',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                "Args": [
                    'spark-submit',
                    '--deploy-mode',
                    'cluster',
                    '--master',
                    'yarn',
                    "s3://{{ var.value.s3_folder }}/scripts/emr.py",
                    '--execution_date',
                    '{{ ds }}'
                ],
            },
        }
    ]
    
    
    JOB_FLOW_OVERRIDES = {
        'Name': 'EMR Test',
        "ReleaseLabel": "emr-6.4.0",
        "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
        'Instances': {
            'InstanceGroups': [
                {
                    'Name': 'Master node',
                    'Market': 'SPOT',
                    'InstanceRole': 'MASTER',
                    'InstanceType': instance_type,
                    'InstanceCount': 1,
                },
                {
                    "Name": "Core",
                    "Market": "SPOT",
                    "InstanceRole": "CORE",
                    "InstanceType": instance_type,
                    "InstanceCount": 1,
                },
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
        },
        'Steps': SPARK_STEPS,
        'JobFlowRole': 'EMR_EC2_DefaultRole',
        'ServiceRole': 'EMR_DefaultRole',
        'BootstrapActions': [
            {
                'Name': 'string',
                'ScriptBootstrapAction': {
                    'Path': 's3://{{ var.value.s3_folder  }}/scripts/emr_bootstrap.sh',
                }
            },
        ],
        'LogUri': 's3://{{ var.value.s3_folder }}/logs',
         "Configurations": [
            {
                "Classification": "spark-defaults",
                "Properties":
                    {
                        "spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
                        "spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
                    },
            }
        ]
    }
    
    
    emr_creator = EmrCreateJobFlowOperator(
            task_id='create_emr',
            job_flow_overrides=JOB_FLOW_OVERRIDES,
            aws_conn_id='aws_conn',
            emr_conn_id='emr_conn',
            region_name='us-west-2',
        )

This in turn, results in a new error which somehow makes Spark not able to read the JSON files, treating them as corrupt files.

root
 |-- _corrupt_record: string (nullable = true)

Lastly, the common emr_boostrap.sh:

#!/bin/bash -xe

sudo pip3 install -U \
    boto3 \
    typing


cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar

CodePudding user response:

Im not sure how the emr is being provisioned but below is how you would do it.

First upload the postgres jdbc jar to an s3 location. THen refer to that when you provision the cluster.

If you are provisioning via Cloudformation then below is what you will need to do

  EMR:
    Type: AWS::EMR::Cluster
    Properties:
      Applications:
        - Name: Spark
      Configurations:
        - Classification: spark-defaults
          ConfigurationProperties:
            spark.jars: s3://path_to_jar/postgresql-42.2.11.jar

If its cli commands, then it would be something like below

aws emr create-cluster ...... --configurations config.json

where config.json may look like below

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://path_to_jar/postgresql-42.2.11.jar"
     }
  }
]

EDIT:

After seeing ur edited question , I can see ur spark submit args( SPARKSTEP variable). In that section just add two more items like below

‘—jars’
‘s3://pathtodriver/postgresdriver.jar’
  • Related