I am trying to run a containerized application of Airflow and Spark using the following repository,
From what I can see, this is generated through the following command,
: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"
All of the variables are then exported,
export \
AIRFLOW__CELERY__BROKER_URL \
AIRFLOW__CELERY__RESULT_BACKEND \
AIRFLOW__CORE__EXECUTOR \
AIRFLOW__CORE__FERNET_KEY \
AIRFLOW__CORE__LOAD_EXAMPLES \
AIRFLOW__CORE__SQL_ALCHEMY_CONN \
This seems to be in line with what is available in the documentation. What is the problem here?
CodePudding user response:
You need to generate a new fernet key and add it to your airflow config. https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html
As the link suggests fernet key is what is used by airflow to encrypt passwords that are stored in the connection information. In the above case, the fernet key was never set hence the error - Invalid token.
Fernet is an implementation of symmetric encryption. But that would be way out of the scope of this issue!
CodePudding user response:
The quick fix for this is to just delete the existing connection in the UI and then create a new one with the required parameters.
A better way would be to use airflow.models.Connection
along with airflow.settings
to manage these connection programmatically.
This is explained in detail here, Is there a way to create/modify connections through Airflow API
The code snippet given below has been derived from an answer for the above. It can be used to check if a connection exists and delete it if available, and then create a new connection using the given connection details,
def create_conn(conn_id, conn_type, host, login, password, port):
conn = Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
login=login,
password=password,
port=port
)
session = settings.Session()
conn_name = session\
.query(Connection)\
.filter(Connection.conn_id == conn.conn_id)\
.first()
if str(conn_name) == str(conn_id):
print(help(session))
session.delete(conn_name)
logging.info(f"Connection {conn_id} already exists")
session.add(conn)
session.commit()
logging.info(Connection.log_info(conn))
logging.info(f'Connection {conn_id} is created')
To create a new connection to Spark using the parameters we want,
create_conn(conn_id='spark_default', conn_type='', host='spark://spark', login='', password='', port='7077')