可以通过在 DAG 文件中定义连接来解决此错误。以下是一个示例,其中连接命名为 my_db_connection:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Connection
def create_connection():
conn = Connection(
conn_id='my_db_connection',
conn_type='mysql',
host='localhost',
login='my_user',
password='my_password',
port=3306
)
conn.insert()
dag = DAG('my_dag', schedule_interval=None, start_date=datetime(2020, 8, 1))
task1 = PythonOperator(task_id='task1', python_callable=create_connection, dag=dag)
这将在 DAG 中创建名为 my_db_connection 的连接。如果在 DAG 中使用此连接,Airflow 将不再报告找不到连接 ID 的错误。