这通常是由于任务实例与本地任务作业之间的时间戳不匹配所致。可以通过将以下代码添加到Airflow DAG文件中的每个任务之前来解决该问题:
from datetime import datetime, timedelta
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'example_dag',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
)
def my_bash_operator():
# Your bash command here
command = "echo 'Hello, Airflow!'"
return BashOperator(
task_id='my_bash_operator',
bash_command=command,
dag=dag,
)
dag_start_time = datetime.now() - timedelta(days=2)
dag_start_time_str = dag_start_time.strftime('%Y-%m-%dT%H:%M:%S')
os.environ['AIRFLOW_CTX_DAG_RUN_ID'] = 'my_dag_run_id'
os.environ['AIRFLOW_CTX_EXECUTION_DATE'] = dag_start_time_str
my_bash_operator()
通过将每个任务的时间戳强制设置为DAG的起始时间,我们可以确保本地任务作业正确地捕获任务实例的状态,并正确记录其返回代码。