Airflow UI 中的 DAG 配置可以通过使用 Jinja 模板引擎来动态生成任务映射标识符。例如,使用 {{ ds }}
来表示日期字符串,可用于每个 DAG 实例的唯一标识符。示例代码如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'my_dynamic_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
for i in range(5):
task = BashOperator(
task_id=f'task_{i}_{{{{ ds }}}}',
bash_command='echo "Hello from task {{ ti.task_id }}"',
dag=dag,
)
在上述示例中,我们使用循环创建 5 个 Bash 操作任务,并使用 {{ ds }}
将日期字符串添加到每个任务的唯一标识符中。这将使每个任务的任务标识符在 DAG 实例之间动态变化,从而避免重复的任务标识符。在使用 Airflow UI 查看 DAG 实例时,我们可以看到每个任务的实际任务标识符类似于 task_0_20220101
、task_1_20220101
等。