Home > Net >  Airflow emrAddStepsOperator unable to execute spark shaded jar
Airflow emrAddStepsOperator unable to execute spark shaded jar

Time:06-13

what should be in step type for spark app .. I am facing issue that master type not set or unable to recognize yarn .. seems it is considering the application as simple jar rather than spark submit mode when using emrAddStepsOperator .. please find attached airflow dag , error and emr screenshot

amazon emr cloud console manually adding spark job as a step

After adding spark jar type step rather than custom jar step .. gives option to give spark submit args and main method args separately

step type can be streaming or spark app or custom jar

My Error Message:

> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

This is an example dag for a AWS EMR Pipeline.

Starting by creating a cluster, adding steps/operations, checking steps and finally when finished terminating the cluster.

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    

from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor    
from airflow.utils.dates import days_ago    

SPARK_STEPS = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['spark-submit',    
                    '--deploy-mode',    
                    'cluster',    
                    '--master',    
                    'yarn',    
                    '--class',    
                    'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },
    }
]
SPARK_STEPS2 = [
    {
        'Name': 'sadim_test3',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
            'MainClass': 'com.sadim.scalatestnadeem.Test',
            'Args': ['spark-submit',    
                '--deploy-mode',    
                'client',    
                '--master',    
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS3 = [    
    {    
        'Name': 'sadim_test3',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
            'MainClass': 'com.sadim.TestSadim',    
            'Args': ['spark-submit',     
                '--deploy-mode',    
                'client',     
                '--master',     
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS4 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                'spark-submit',    
                '--deploy-mode',    
                'client',    
                 '--master',    
                 'yarn',                    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
SPARK_STEPS5 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
JOB_FLOW_OVERRIDES = {    
    'Name': 'ob_emr_airflow_automation',    
    'ReleaseLabel': 'emr-6.6.0',    
    'LogUri': 's3://test-data/emr_logs/',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'm5.xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'm5.xlarge',    
                    'InstanceCount': 1    
            }    
        ],    
        'Ec2SubnetId': 'subnet-03129248888a14196',    
        'Ec2KeyName': 'datalake-emr-nodes',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False    
    },    
    'BootstrapActions': [    
                {    
                        'Name': 'Java11InstallBootstrap',    
                        'ScriptBootstrapAction': {    
                            'Path': 's3://test-data/jars/bootstrap.sh',    
                            'Args': [    
                            ]    
                        }    
                }    
    ],    
    'Configurations': [    
        {    
            "Classification":"spark-defaults",    
                    "Properties":{    
                    "spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
                    "spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX: PrintGCDetails -XX: PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX: IgnoreUnrecognizedVMOptions"
                                }    
        }    
    ],    
    'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
    'ServiceRole': 'EMR_DefaultRole',    
}    

with DAG(    
    dag_id='emr_job_flow_manual_steps_dag_v6',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['[email protected]'],    
        'email_on_failure': False,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=1),    
    start_date=days_ago(1),
    schedule_interval='0 3 * * *',
    tags=['example'],
) as dag:

    # [START howto_operator_emr_manual_steps_tasks]
    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=dag,
                                                   python_callable=lambda: time.sleep(400))

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
        steps=SPARK_STEPS5,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id=cluster_creator.output,
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
       aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
    )

    cluster_creator >> step_adder >> step_checker >> cluster_remover

    # [END howto_operator_emr_manual_steps_tasks]

    # Task dependencies created via `XComArgs`:
    #   cluster_creator >> step_checker
    #   cluster_creator >> cluster_remover
    

CodePudding user response:

The issue got fixed. We need to use the command-runner jar as jar option in emrAddStepsOperator and pass your specific ETL job jar inside args as

SPARK_STEPS = [
    {
        'Name': 'DetectAndLoadOrphanTransactions',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            #'MainClass': 'com.sadim.main',
            'Args': [ 
            'spark-submit',
            '--deploy-mode',
                'cluster',
                 '--master',
                 'yarn',
                 '--class',
                    'com.sadim.DetectAndLoadOrphanTransactions',
                 's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
                '--Txnspath',
                's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
                '--demographyPath',
                's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
                '--outPath',
                's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
        },
    }
]

  • Related