Airflow集群的工作节点是指运行Airflow任务的节点,而DAGs(Directed Acyclic Graphs)/ Workflows是定义任务和任务依赖关系的代码。根据需求,是否需要在所有的工作节点中部署DAGs / Workflows可以有不同的解决方法。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_task():
# 任务逻辑代码
pass
with DAG('my_dag', start_date=datetime(2022, 1, 1)) as dag:
task1 = PythonOperator(
task_id='task1',
python_callable=my_task
)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_task():
# 任务逻辑代码
pass
with DAG('my_dag', start_date=datetime(2022, 1, 1)) as dag:
task1 = PythonOperator(
task_id='task1',
python_callable=my_task,
node='my_single_worker_node' # 指定任务执行的节点
)
通过以上两种方法,可以根据实际需求选择是否在所有的工作节点中部署DAGs / Workflows。