- 确认DAG是否处于active状态。
dag_bag = DagBag(dag_folder=DAGS_FOLDER)
dag = dag_bag.get_dag(dag_id)
if not dag.is_active:
print("DAG is not active")
- 确认DAG schedule_interval是否设置正确。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.today() - timedelta(days=1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'dag_id',
default_args=default_args,
description='My First DAG',
schedule_interval=timedelta(days=1),
)
- 确认任务定时时间是否已过。
@dag.catchup(False)
def my_task(ds, **kwargs):
"""
ds: execution date as 'YYYY-MM-DD'
"""
logging.debug(f"Date of execution is {ds}")
# my task logic
- 确认任务状态是否正确。
./airflow tasks list dag_id
./airflow tasks state dag_id task_id date
- 确认监听的队列是否可用。
queue = Queue('test_queue', connection=Redis.from_url('redis://localhost'))
task = TaskInstance(task, execution_date=execution_date)
queued = task.state == State.QUEUED
in_queue = task.key in queue