Home > Blockchain >  Airflow 2 conn URI, extra field JSON for AWS in secret manager
Airflow 2 conn URI, extra field JSON for AWS in secret manager

Time:11-23

I have setup AWS Secret manager as my secret backend in Airflow 2. I have defined a aws_default connection like so in secret manager (plain text):
aws:///extra?region_name=us-east-1&session_kwargs={"profile_name": "my_profile"}

When I call a hook with it (AwsGlueCrawlerHook(aws_conn_id='aws_default')), I get the following error:

Traceback (most recent call last):
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
    error_file=args.error_file,
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/home/airflow/airflow/dags/reboots/operators/start_glue_crawler_operator.py", line 29, in execute
    AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id).start_crawler(crawler_name=self.crawler_name)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 120, in start_crawler
    crawler = self.glue_client.start_crawler(Name=crawler_name)
  File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 48, in glue_client
    return self.get_conn()
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 494, in get_conn
    return self.conn
  File "/home/airflow/venv/lib64/python3.7/site-packages/cached_property.py", line 36, in __get__
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 476, in conn
    return self.get_client_type(self.client_type, region_name=self.region_name)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 442, in get_client_type
    session, endpoint_url = self._get_credentials(region_name)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 418, in _get_credentials
    conn=connection_object, region_name=region_name, config=self.config
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 74, in create_session
    self.basic_session = self._create_basic_session(session_kwargs=session_kwargs)
  File "/home/airflow/venv/lib64/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 100, in _create_basic_session
    **session_kwargs,
TypeError: type object argument after ** must be a mapping, not str

I dont know how to form my airflow conn URI so that it ends up being loaded correctly (aka as a dictionary, not a string). I have tried to escape the quotes, etc. I just can't figure it out. It doesn't error out if I just use: aws:///extra?region_name=us-east-1

So I know the problem is with the way I wrote the session_kwargs parameter. I know I could change full_url_mode to false in the backend_kwargs, but at this point I am really curious as to how to write the conn URI.

CodePudding user response:

Well, found the answer here: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html

from airflow.models.connection import Connection

extra = {
    "config_kwargs": {
        "proxies": {
            "http": "http://user:[email protected]:3128",
            "https": "http://user:[email protected]:3128"
        }
    }
}

c = Connection(
    conn_id="some_conn",
    conn_type="aws",
    description="",
    host="",
    login="",
    password="",
    extra=json.dumps(extra),
)
print(c.get_uri())

It needs to be URL encoded but not entirely. Anyway, if you have a JSON to encode, airflow provides you with the tool to do so.

  • Related