what should be in step type for spark app .. I am facing issue that master type not set or unable to recognize yarn .. seems it is considering the application as simple jar rather than spark submit mode when using emrAddStepsOperator .. please find attached airflow dag , error and emr screenshot
amazon emr cloud console manually adding spark job as a step
step type can be streaming or spark app or custom jar
My Error Message:
> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
This is an example dag for a AWS EMR Pipeline.
Starting by creating a cluster, adding steps/operations, checking steps and finally when finished terminating the cluster.
import time
from airflow.operators.python import PythonOperator
from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS2 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
'MainClass': 'com.sadim.scalatestnadeem.Test',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS3 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',
'MainClass': 'com.sadim.TestSadim',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS4 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS5 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'ob_emr_airflow_automation',
'ReleaseLabel': 'emr-6.6.0',
'LogUri': 's3://test-data/emr_logs/',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
}
],
'Ec2SubnetId': 'subnet-03129248888a14196',
'Ec2KeyName': 'datalake-emr-nodes',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
},
'BootstrapActions': [
{
'Name': 'Java11InstallBootstrap',
'ScriptBootstrapAction': {
'Path': 's3://test-data/jars/bootstrap.sh',
'Args': [
]
}
}
],
'Configurations': [
{
"Classification":"spark-defaults",
"Properties":{
"spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' - XX:MaxHeapFreeRatio=70",
"spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time - XX: PrintGCDetails -XX: PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX: IgnoreUnrecognizedVMOptions"
}
}
],
'JobFlowRole': 'DL_EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id='emr_job_flow_manual_steps_dag_v6',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=1),
start_date=days_ago(1),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(400))
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
steps=SPARK_STEPS5,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
# [END howto_operator_emr_manual_steps_tasks]
# Task dependencies created via `XComArgs`:
# cluster_creator >> step_checker
# cluster_creator >> cluster_remover
CodePudding user response:
The issue got fixed. We need to use the command-runner jar as jar option in emrAddStepsOperator and pass your specific ETL job jar inside args as
SPARK_STEPS = [
{
'Name': 'DetectAndLoadOrphanTransactions',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
#'MainClass': 'com.sadim.main',
'Args': [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.DetectAndLoadOrphanTransactions',
's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
'--Txnspath',
's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
'--demographyPath',
's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
'--outPath',
's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
},
}
]