要停止DAG的运行,可以使用Airflow的API来实现。以下是一个使用Python代码示例的解决方法:
import requests
# 设置Airflow的API连接信息
AIRFLOW_API_URL = 'http://localhost:8080/api/experimental'
HEADERS = {'Content-Type': 'application/json'}
def stop_dag_execution(dag_id):
# 构建停止DAG运行的API请求URL
url = f"{AIRFLOW_API_URL}/dags/{dag_id}/paused/true"
# 发送PUT请求来停止DAG运行
response = requests.put(url, headers=HEADERS)
# 检查请求的响应状态码
if response.status_code == 200:
print(f"DAG {dag_id} execution has been stopped successfully.")
else:
print(f"Failed to stop DAG {dag_id} execution.")
# 调用函数停止DAG运行
stop_dag_execution('your_dag_id')
在上述代码中,首先定义了Airflow的API连接信息,包括API的URL和请求头信息。然后,定义了一个stop_dag_execution
函数,该函数接受一个dag_id
参数,用于停止指定的DAG运行。
在函数内部,使用requests
库发送了一个PUT请求到Airflow的API URL,将paused
参数设置为true
来停止DAG运行。然后,检查响应的状态码,如果状态码为200,则表示成功停止了DAG运行。
最后,调用stop_dag_execution
函数并传入要停止的DAG的dag_id
。根据请求的响应,打印出相应的成功或失败消息。