如果Airflow的定时DAG没有按预期工作,通常是因为调度器没有正确地分配任务。这可以通过检查DAG和任务的状态来解决,以确保它们设置正确并且在调度器中注册。以下是一个示例代码,演示如何创建一个任务并确保它被调度执行:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['example@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
description='My example dag',
schedule_interval=timedelta(days=1),
)
run_task = BashOperator(
task_id='run_task',
bash_command='echo "Hello World"',
dag=dag,
)
在此示例中,我们创建一个名为my_dag
的DAG,它每天运行一次。DAG的唯一任务是run_task
,它只是打印一条消息。我们可以使用以下Airflow CLI命令来检查DAG和任务的状态:
$ airflow list_dags
$ airflow list_tasks my_dag
$ airflow list_dag_runs my_dag
$ airflow list_tasks my_dag --dates 2021-01-01T00:00:00
$ airflow test my_dag run_task 2021-01-01T00:00:00
如果DAG和任务的状态正确,并且它们按计划执行,那么问题应该已经解决了。