该错误可能是由于在 Airflow 存储后端中已经存在相同的 run id 导致的。可以尝试使用不同的 run id 或删除旧的 run id,然后重新运行 DAG。
以下是使用 Python 脚本删除旧的 run id 的示例代码:
from airflow.models import DagRun
from datetime import datetime
# specify the dag id and run id to delete
dag_id = 'my_dag'
run_id = 'my_run_id'
# delete existing dag run
try:
dag_run = DagRun.find(dag_id=dag_id, run_id=run_id)
if dag_run:
# mark the dag run as failed
dag_run.state = 'failed'
dag_run.end_date = datetime.now()
dag_run.log('Manually set DagRun state to failed')
dag_run.update()
# delete the dag run
DagRun.delete(dag_run)
print('Dag run {} deleted.'.format(run_id))
else:
print('Dag run {} not found.'.format(run_id))
except Exception as e:
print('Failed to delete dag run {}: {}'.format(run_id, str(e)))
请注意,使用此方法删除正在运行的 DAG 可能会导致数据丢失或其他问题。因此,在删除任何 DAG 运行之前,请先了解运行期间的情况。
上一篇:ApacheAirflowcredentialsinOpenmetadata改写为中文并给出技术性的具体解法和代码示例
下一篇:ApacheAirflowDAG导入错误-FileNotFoundError:[Errno2]Nosuchfileordirectory(文件或目录不存在)